(callback) {
+
+ @Override
+ public void completed(final SSLSession sslSession) {
+ if (callback != null) {
+ callback.completed(H2TunnelProtocolIOSession.this);
+ }
+ }
+
+ });
+
+ if (tlsSessionRef.compareAndSet(null, sslioSession)) {
+ currentSessionRef.set(sslioSession);
+ } else {
+ throw new IllegalStateException("TLS already activated");
+ }
+
+ try {
+ sslioSession.beginHandshake(this);
+ } catch (final Exception ex) {
+ if (callback != null) {
+ callback.failed(ex);
+ }
+ close(CloseMode.IMMEDIATE);
+ }
+ }
+
+ @Override
+ public TlsDetails getTlsDetails() {
+ final SSLIOSession sslIoSession = tlsSessionRef.get();
+ return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
+ }
+
+ @Override
+ public int getPendingCommandCount() {
+ return currentSessionRef.get().getPendingCommandCount();
+ }
+}
\ No newline at end of file
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2TunnelRawIOSession.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2TunnelRawIOSession.java
new file mode 100644
index 0000000000..3d8d0b3d77
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2TunnelRawIOSession.java
@@ -0,0 +1,692 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.http2.nio.support;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.http.StreamControl;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Raw tunnel IOSession implementation: bounded buffering + capacity updates + stream-scoped close/cancel.
+ *
+ */
+final class H2TunnelRawIOSession implements IOSession {
+
+ private static final int INBOUND_BUFFER_LIMIT = 64 * 1024;
+ private static final int OUTBOUND_BUFFER_LIMIT = 64 * 1024;
+
+ private final IOSession physicalSession;
+ private final String id;
+ private final Lock lock;
+
+ private final Deque commandQueue;
+ private final Deque inboundQueue;
+ private final Deque outboundQueue;
+
+ private final AtomicReference handlerRef;
+ private final AtomicReference dataChannelRef;
+ private final AtomicReference streamControlRef;
+
+ private CapacityChannel capacityChannel;
+ private Timeout socketTimeout;
+ private int eventMask;
+ private Status status;
+
+ private int inboundBytes;
+ private int outboundBytes;
+ private int consumedBytesSinceUpdate;
+
+ private boolean capacityInitialized;
+ private boolean localEndStreamSent;
+ private boolean remoteEndStream;
+
+ private long lastReadTime;
+ private long lastWriteTime;
+ private long lastEventTime;
+
+ H2TunnelRawIOSession(
+ final IOSession physicalSession,
+ final Timeout socketTimeout,
+ final StreamControl streamControl) {
+ this.physicalSession = physicalSession;
+ this.id = physicalSession.getId() + "-h2-tunnel";
+ this.lock = new ReentrantLock();
+ this.commandQueue = new ArrayDeque<>();
+ this.inboundQueue = new ArrayDeque<>();
+ this.outboundQueue = new ArrayDeque<>();
+ this.handlerRef = new AtomicReference<>();
+ this.dataChannelRef = new AtomicReference<>();
+ this.streamControlRef = new AtomicReference<>(streamControl);
+
+ this.capacityChannel = null;
+ this.socketTimeout = socketTimeout;
+ this.eventMask = SelectionKey.OP_READ;
+ this.status = Status.ACTIVE;
+
+ this.capacityInitialized = false;
+ this.localEndStreamSent = false;
+ this.remoteEndStream = false;
+
+ final long now = System.currentTimeMillis();
+ this.lastReadTime = now;
+ this.lastWriteTime = now;
+ this.lastEventTime = now;
+ }
+
+ void bindStreamControl(final StreamControl streamControl) {
+ streamControlRef.compareAndSet(null, streamControl);
+ }
+
+ void attachChannel(final DataStreamChannel channel) {
+ dataChannelRef.set(channel);
+ }
+
+ void updateCapacityChannel(final CapacityChannel capacityChannel) throws IOException {
+ int update = 0;
+ lock.lock();
+ try {
+ this.capacityChannel = capacityChannel;
+ if (!capacityInitialized) {
+ update += Math.max(0, INBOUND_BUFFER_LIMIT - inboundBytes);
+ capacityInitialized = true;
+ }
+ if (consumedBytesSinceUpdate > 0) {
+ update += consumedBytesSinceUpdate;
+ consumedBytesSinceUpdate = 0;
+ }
+ } finally {
+ lock.unlock();
+ }
+ if (update > 0) {
+ capacityChannel.update(update);
+ }
+ }
+
+ void onRemoteStreamEnd() {
+ lock.lock();
+ try {
+ remoteEndStream = true;
+ if (status == Status.ACTIVE) {
+ status = Status.CLOSING;
+ }
+ if (localEndStreamSent) {
+ status = Status.CLOSED;
+ }
+ lastEventTime = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void requestOutput() {
+ final DataStreamChannel dataChannel = dataChannelRef.get();
+ if (dataChannel != null) {
+ dataChannel.requestOutput();
+ }
+ }
+
+ int available() {
+ lock.lock();
+ try {
+ if (outboundBytes > 0) {
+ return outboundBytes;
+ }
+ if (!localEndStreamSent && status == Status.CLOSING) {
+ return 1;
+ }
+ if (!commandQueue.isEmpty() || (eventMask & SelectionKey.OP_WRITE) != 0) {
+ return 1;
+ }
+ return 0;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void appendInput(final ByteBuffer src) throws IOException {
+ if (src == null || !src.hasRemaining()) {
+ return;
+ }
+ lock.lock();
+ try {
+ if (status == Status.CLOSED) {
+ return;
+ }
+ final int remaining = src.remaining();
+ final int freeSpace = INBOUND_BUFFER_LIMIT - inboundBytes;
+ if (remaining > freeSpace) {
+ throw new IOException("Tunnel inbound buffer overflow");
+ }
+ final byte[] data = new byte[remaining];
+ src.get(data);
+ inboundQueue.addLast(ByteBuffer.wrap(data));
+ inboundBytes += data.length;
+ final long now = System.currentTimeMillis();
+ lastReadTime = now;
+ lastEventTime = now;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void discardInbound(final int bytes) throws IOException {
+ if (bytes <= 0) {
+ return;
+ }
+ int remaining = bytes;
+ int update = 0;
+ CapacityChannel currentCapacityChannel = null;
+
+ lock.lock();
+ try {
+ while (remaining > 0) {
+ final ByteBuffer buffer = inboundQueue.peekFirst();
+ if (buffer == null) {
+ break;
+ }
+ final int chunk = Math.min(remaining, buffer.remaining());
+ if (chunk <= 0) {
+ break;
+ }
+ buffer.position(buffer.position() + chunk);
+ remaining -= chunk;
+ inboundBytes -= chunk;
+ if (!buffer.hasRemaining()) {
+ inboundQueue.pollFirst();
+ }
+ consumedBytesSinceUpdate += chunk;
+ }
+ if (capacityChannel != null && consumedBytesSinceUpdate > 0) {
+ currentCapacityChannel = capacityChannel;
+ update = consumedBytesSinceUpdate;
+ consumedBytesSinceUpdate = 0;
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ if (currentCapacityChannel != null && update > 0) {
+ currentCapacityChannel.update(update);
+ }
+ }
+
+ void flushOutput() throws IOException {
+ final DataStreamChannel dataChannel = dataChannelRef.get();
+ if (dataChannel == null) {
+ return;
+ }
+
+ boolean sendEndStream = false;
+
+ lock.lock();
+ try {
+ for (; ; ) {
+ final ByteBuffer buffer = outboundQueue.peekFirst();
+ if (buffer == null) {
+ break;
+ }
+ final int bytesWritten = dataChannel.write(buffer);
+ if (bytesWritten <= 0) {
+ break;
+ }
+ outboundBytes -= bytesWritten;
+ if (!buffer.hasRemaining()) {
+ outboundQueue.pollFirst();
+ }
+ final long now = System.currentTimeMillis();
+ lastWriteTime = now;
+ lastEventTime = now;
+ }
+ if (!localEndStreamSent && status == Status.CLOSING && outboundQueue.isEmpty()) {
+ localEndStreamSent = true;
+ sendEndStream = true;
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ if (sendEndStream) {
+ try {
+ dataChannel.endStream(null);
+ } finally {
+ lock.lock();
+ try {
+ if (remoteEndStream) {
+ status = Status.CLOSED;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
+
+ private void cancelPendingCommands() {
+ for (final Command command : commandQueue) {
+ command.cancel();
+ }
+ commandQueue.clear();
+ }
+
+ private void cancelStream() {
+ final StreamControl streamControl = streamControlRef.get();
+ if (streamControl != null) {
+ streamControl.cancel();
+ }
+ }
+
+ @Override
+ public IOEventHandler getHandler() {
+ return handlerRef.get();
+ }
+
+ @Override
+ public void upgrade(final IOEventHandler handler) {
+ handlerRef.set(handler);
+ }
+
+ @Override
+ public Lock getLock() {
+ return lock;
+ }
+
+ @Override
+ public void enqueue(final Command command, final Command.Priority priority) {
+ if (command == null) {
+ return;
+ }
+ lock.lock();
+ try {
+ if (status != Status.ACTIVE) {
+ command.cancel();
+ return;
+ }
+ if (priority == Command.Priority.IMMEDIATE) {
+ commandQueue.addFirst(command);
+ } else {
+ commandQueue.addLast(command);
+ }
+ lastEventTime = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
+ }
+ requestOutput();
+ }
+
+ @Override
+ public boolean hasCommands() {
+ lock.lock();
+ try {
+ return !commandQueue.isEmpty();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Command poll() {
+ lock.lock();
+ try {
+ return commandQueue.pollFirst();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public ByteChannel channel() {
+ return this;
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return physicalSession.getRemoteAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalAddress() {
+ return physicalSession.getLocalAddress();
+ }
+
+ @Override
+ public int getEventMask() {
+ lock.lock();
+ try {
+ return eventMask;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void setEventMask(final int ops) {
+ final boolean wantOutput = (ops & SelectionKey.OP_WRITE) != 0;
+ lock.lock();
+ try {
+ eventMask = ops;
+ lastEventTime = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
+ }
+ if (wantOutput) {
+ requestOutput();
+ }
+ }
+
+ @Override
+ public void setEvent(final int op) {
+ final boolean wantOutput = (op & SelectionKey.OP_WRITE) != 0;
+ lock.lock();
+ try {
+ eventMask |= op;
+ lastEventTime = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
+ }
+ if (wantOutput) {
+ requestOutput();
+ }
+ }
+
+ @Override
+ public void clearEvent(final int op) {
+ lock.lock();
+ try {
+ eventMask &= ~op;
+ lastEventTime = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ close(CloseMode.GRACEFUL);
+ }
+
+ @Override
+ public void close(final CloseMode closeMode) {
+ boolean cancel = false;
+
+ lock.lock();
+ try {
+ if (status == Status.CLOSED) {
+ return;
+ }
+ if (closeMode == CloseMode.IMMEDIATE) {
+ status = Status.CLOSED;
+ localEndStreamSent = true;
+ cancelPendingCommands();
+ inboundQueue.clear();
+ inboundBytes = 0;
+ outboundQueue.clear();
+ outboundBytes = 0;
+ consumedBytesSinceUpdate = 0;
+ cancel = true;
+ } else {
+ status = Status.CLOSING;
+ if (dataChannelRef.get() == null && outboundBytes == 0) {
+ status = Status.CLOSED;
+ localEndStreamSent = true;
+ cancel = true;
+ }
+ }
+ lastEventTime = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
+ }
+
+ if (cancel) {
+ cancelStream();
+ } else {
+ requestOutput();
+ }
+ }
+
+ @Override
+ public Status getStatus() {
+ lock.lock();
+ try {
+ return status;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Timeout getSocketTimeout() {
+ lock.lock();
+ try {
+ return socketTimeout;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void setSocketTimeout(final Timeout timeout) {
+ lock.lock();
+ try {
+ socketTimeout = timeout;
+ lastEventTime = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public long getLastReadTime() {
+ lock.lock();
+ try {
+ return lastReadTime;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public long getLastWriteTime() {
+ lock.lock();
+ try {
+ return lastWriteTime;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public long getLastEventTime() {
+ lock.lock();
+ try {
+ return lastEventTime;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void updateReadTime() {
+ final long now = System.currentTimeMillis();
+ lock.lock();
+ try {
+ lastReadTime = now;
+ lastEventTime = now;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void updateWriteTime() {
+ final long now = System.currentTimeMillis();
+ lock.lock();
+ try {
+ lastWriteTime = now;
+ lastEventTime = now;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int read(final ByteBuffer dst) throws IOException {
+ int total = 0;
+ int update = 0;
+ CapacityChannel currentCapacityChannel = null;
+
+ lock.lock();
+ try {
+ if (inboundQueue.isEmpty()) {
+ return remoteEndStream || status == Status.CLOSED ? -1 : 0;
+ }
+ while (dst.hasRemaining()) {
+ final ByteBuffer buffer = inboundQueue.peekFirst();
+ if (buffer == null) {
+ break;
+ }
+ final int chunk = Math.min(dst.remaining(), buffer.remaining());
+ if (chunk <= 0) {
+ break;
+ }
+
+ if (buffer.hasArray()) {
+ final int pos = buffer.position();
+ dst.put(buffer.array(), buffer.arrayOffset() + pos, chunk);
+ buffer.position(pos + chunk);
+ } else {
+ for (int i = 0; i < chunk; i++) {
+ dst.put(buffer.get());
+ }
+ }
+
+ total += chunk;
+ inboundBytes -= chunk;
+ if (!buffer.hasRemaining()) {
+ inboundQueue.pollFirst();
+ }
+ }
+
+ if (total > 0) {
+ consumedBytesSinceUpdate += total;
+ final long now = System.currentTimeMillis();
+ lastReadTime = now;
+ lastEventTime = now;
+ if (capacityChannel != null && consumedBytesSinceUpdate > 0) {
+ currentCapacityChannel = capacityChannel;
+ update = consumedBytesSinceUpdate;
+ consumedBytesSinceUpdate = 0;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ if (currentCapacityChannel != null && update > 0) {
+ currentCapacityChannel.update(update);
+ }
+ return total;
+ }
+
+ @Override
+ public int write(final ByteBuffer src) {
+ if (src == null || !src.hasRemaining()) {
+ return 0;
+ }
+ int bytesAccepted = 0;
+
+ lock.lock();
+ try {
+ if (status != Status.ACTIVE) {
+ return 0;
+ }
+ final int freeSpace = OUTBOUND_BUFFER_LIMIT - outboundBytes;
+ if (freeSpace <= 0) {
+ return 0;
+ }
+ bytesAccepted = Math.min(src.remaining(), freeSpace);
+ if (bytesAccepted <= 0) {
+ return 0;
+ }
+
+ final byte[] data = new byte[bytesAccepted];
+ src.get(data);
+
+ outboundQueue.addLast(ByteBuffer.wrap(data));
+ outboundBytes += bytesAccepted;
+
+ final long now = System.currentTimeMillis();
+ lastWriteTime = now;
+ lastEventTime = now;
+ } finally {
+ lock.unlock();
+ }
+
+ requestOutput();
+ return bytesAccepted;
+ }
+
+ @Override
+ public boolean isOpen() {
+ lock.lock();
+ try {
+ return status != Status.CLOSED && physicalSession.isOpen();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public int getPendingCommandCount() {
+ lock.lock();
+ try {
+ return commandQueue.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
\ No newline at end of file
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/TunnelRefusedException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/TunnelRefusedException.java
new file mode 100644
index 0000000000..77acc476db
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/TunnelRefusedException.java
@@ -0,0 +1,68 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.http2.nio.support;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Exception indicating CONNECT tunnel refusal by the proxy.
+ *
+ * @since 5.5
+ */
+public final class TunnelRefusedException extends HttpException {
+
+ private static final long serialVersionUID = 1L;
+
+ private final HttpResponse response;
+
+ public TunnelRefusedException(final HttpResponse response) {
+ super("Tunnel refused: " + new StatusLine(Args.notNull(response, "Response")));
+ this.response = copy(response);
+ }
+
+ public HttpResponse getResponse() {
+ return response;
+ }
+
+ public int getStatusCode() {
+ return response.getCode();
+ }
+
+ private static HttpResponse copy(final HttpResponse response) {
+ final BasicHttpResponse copy = new BasicHttpResponse(response.getCode(), response.getReasonPhrase());
+ copy.setVersion(response.getVersion());
+ for (final Header header : response.getHeaders()) {
+ copy.addHeader(header);
+ }
+ return copy;
+ }
+}
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaH2ProxyExecutionExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaH2ProxyExecutionExample.java
new file mode 100644
index 0000000000..c275fec2ee
--- /dev/null
+++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaH2ProxyExecutionExample.java
@@ -0,0 +1,178 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.http2.examples;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.concurrent.FutureContribution;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.impl.nio.ClientH2PrefaceHandler;
+import org.apache.hc.core5.http2.impl.nio.ClientH2StreamMultiplexerFactory;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap;
+import org.apache.hc.core5.http2.nio.support.H2OverH2TunnelSupport;
+import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.ssl.SSLContexts;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Full example of HTTP/2 request execution through an HTTP/2 proxy tunnel.
+ */
+public class H2ViaH2ProxyExecutionExample {
+
+ private static TlsStrategy createTlsStrategy() throws Exception {
+ final String trustStore = System.getProperty("h2.truststore");
+ if (trustStore == null || trustStore.isEmpty()) {
+ return new H2ClientTlsStrategy();
+ }
+ final String trustStorePassword = System.getProperty("h2.truststore.password", "changeit");
+ final SSLContext sslContext = SSLContexts.custom()
+ .loadTrustMaterial(new File(trustStore), trustStorePassword.toCharArray())
+ .build();
+ return new H2ClientTlsStrategy(sslContext);
+ }
+
+ public static void main(final String[] args) throws Exception {
+ final String proxyScheme = System.getProperty("h2.proxy.scheme", "http");
+ final String proxyHost = System.getProperty("h2.proxy.host", "localhost");
+ final int proxyPort = Integer.parseInt(System.getProperty("h2.proxy.port", "8080"));
+ final String targetScheme = System.getProperty("h2.target.scheme", "https");
+ final String targetHost = System.getProperty("h2.target.host", "origin");
+ final int targetPort = Integer.parseInt(System.getProperty("h2.target.port", "9443"));
+ final String[] requestUris = System.getProperty("h2.paths", "/").split(",");
+
+ final TlsStrategy tlsStrategy = createTlsStrategy();
+
+ final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap()
+ .setH2Config(H2Config.custom().setPushEnabled(false).build())
+ .setTlsStrategy(tlsStrategy)
+ .create();
+ requester.start();
+
+ final HttpHost proxy = new HttpHost(proxyScheme, proxyHost, proxyPort);
+ final HttpHost target = new HttpHost(targetScheme, targetHost, targetPort);
+ final Timeout timeout = Timeout.ofSeconds(30);
+
+ final IOEventHandlerFactory tunnelProtocolStarter = (ioSession, attachment) ->
+ new ClientH2PrefaceHandler(ioSession, new ClientH2StreamMultiplexerFactory(
+ HttpProcessorBuilder.create().build(),
+ null,
+ H2Config.DEFAULT,
+ org.apache.hc.core5.http.config.CharCodingConfig.DEFAULT,
+ null), false, null);
+
+ final ComplexFuture tunnelFuture = new ComplexFuture<>(null);
+ tunnelFuture.setDependency(requester.getConnPool().getSession(proxy, timeout, new FutureContribution(tunnelFuture) {
+
+ @Override
+ public void completed(final IOSession proxySession) {
+ H2OverH2TunnelSupport.establish(
+ proxySession,
+ target,
+ timeout,
+ true,
+ tlsStrategy,
+ tunnelProtocolStarter,
+ new FutureContribution(tunnelFuture) {
+
+ @Override
+ public void completed(final IOSession tunnelSession) {
+ tunnelFuture.completed(tunnelSession);
+ }
+ });
+ }
+
+ }));
+
+ final IOSession tunnelSession = tunnelFuture.get(1, TimeUnit.MINUTES);
+ try {
+ final CountDownLatch latch = new CountDownLatch(requestUris.length);
+
+ for (final String requestUri : requestUris) {
+ final String normalizedRequestUri = requestUri.trim();
+ final AsyncClientExchangeHandler exchangeHandler = new org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler<>(
+ new BasicRequestProducer(Method.GET, target, normalizedRequestUri),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ new FutureCallback>() {
+
+ @Override
+ public void completed(final Message message) {
+ final HttpResponse response = message.getHead();
+ final String body = message.getBody();
+ System.out.println(normalizedRequestUri + " -> " + response.getCode());
+ System.out.println(body);
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ System.out.println(normalizedRequestUri + " -> " + ex);
+ latch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ System.out.println(normalizedRequestUri + " cancelled");
+ latch.countDown();
+ }
+
+ });
+
+ tunnelSession.enqueue(
+ new RequestExecutionCommand(exchangeHandler, HttpCoreContext.create()),
+ Command.Priority.NORMAL);
+ }
+ latch.await();
+ } finally {
+ tunnelSession.close(CloseMode.GRACEFUL);
+ }
+ requester.close(CloseMode.GRACEFUL);
+ }
+
+}
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2OverH2TunnelSupport.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2OverH2TunnelSupport.java
new file mode 100644
index 0000000000..3728df6ca6
--- /dev/null
+++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2OverH2TunnelSupport.java
@@ -0,0 +1,666 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.http2.nio.support;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.StreamControl;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.reactor.ProtocolIOSession;
+import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestH2OverH2TunnelSupport {
+
+ @Test
+ void testEstablishBuildsH2ConnectForAuthorityAndCompletes() {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true);
+ final HttpHost target = new HttpHost("https", "example.org", 443);
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(session, target, Timeout.ofSeconds(1), false, null, callback);
+
+ Assertions.assertTrue(callback.completed);
+ Assertions.assertNull(callback.failed);
+ Assertions.assertFalse(callback.cancelled);
+ Assertions.assertNotNull(session.capturedRequest);
+ Assertions.assertEquals("CONNECT", session.capturedRequest.getMethod());
+ Assertions.assertEquals("example.org", session.capturedRequest.getAuthority().getHostName());
+ Assertions.assertEquals(443, session.capturedRequest.getAuthority().getPort());
+ Assertions.assertNull(session.capturedRequest.getScheme());
+ Assertions.assertNull(session.capturedRequest.getPath());
+ }
+
+ @Test
+ void testEstablishAppliesConnectRequestInterceptor() {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true);
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("https", "example.org", 443),
+ Timeout.ofSeconds(1),
+ false,
+ null,
+ (request, entityDetails, context) -> request.addHeader("Proxy-Authorization", "Basic dGVzdDp0ZXN0"),
+ callback);
+
+ Assertions.assertTrue(callback.completed);
+ Assertions.assertNotNull(session.capturedRequest);
+ Assertions.assertEquals("Basic dGVzdDp0ZXN0", session.capturedRequest.getFirstHeader("Proxy-Authorization").getValue());
+ }
+
+ @Test
+ void testEstablishFailsOnRefusedTunnel() {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED, false);
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("https", "example.org", 443),
+ Timeout.ofSeconds(1),
+ false,
+ null,
+ callback);
+
+ Assertions.assertFalse(callback.completed);
+ Assertions.assertNotNull(callback.failed);
+ Assertions.assertInstanceOf(TunnelRefusedException.class, callback.failed);
+ Assertions.assertEquals(
+ HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED,
+ ((TunnelRefusedException) callback.failed).getStatusCode());
+ }
+
+ @Test
+ void testEstablishFailsWhenConnectResponseHasNoTunnelStream() {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, false);
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("https", "example.org", 443),
+ Timeout.ofSeconds(1),
+ false,
+ null,
+ callback);
+
+ Assertions.assertFalse(callback.completed);
+ Assertions.assertNotNull(callback.failed);
+ }
+
+ @Test
+ void testEstablishWithProtocolStarterInvokesConnectedAndSeesInputBuffer() {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, false, true, false);
+ final RecordingProtocolStarter protocolStarter = new RecordingProtocolStarter();
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("http", "example.org", 80),
+ Timeout.ofSeconds(1),
+ false,
+ null,
+ protocolStarter,
+ callback);
+
+ Assertions.assertTrue(callback.completed);
+ Assertions.assertNull(callback.failed);
+ Assertions.assertTrue(protocolStarter.connectedCalled);
+ Assertions.assertTrue(protocolStarter.inputBufferSeen);
+ }
+
+ @Test
+ void testEstablishSecureTunnelUsesTlsStrategy() {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true);
+ final RecordingTlsStrategy tlsStrategy = new RecordingTlsStrategy();
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("https", "example.org", 443),
+ Timeout.ofSeconds(1),
+ true,
+ tlsStrategy,
+ callback);
+
+ Assertions.assertTrue(callback.completed);
+ Assertions.assertNull(callback.failed);
+ Assertions.assertTrue(tlsStrategy.invoked);
+ }
+
+ @Test
+ void testClosingTunnelDoesNotClosePhysicalSession() {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true);
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("http", "example.org", 80),
+ Timeout.ofSeconds(1),
+ false,
+ null,
+ callback);
+
+ Assertions.assertTrue(callback.completed);
+ Assertions.assertNotNull(callback.result);
+ callback.result.close(CloseMode.IMMEDIATE);
+ Assertions.assertTrue(session.isOpen(), "Closing tunnel session must not close physical HTTP/2 connection");
+ }
+
+ @Test
+ void testTunnelImmediateCloseCancelsStreamControlWhenPresent() {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, false, false, true);
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("http", "example.org", 80),
+ Timeout.ofSeconds(1),
+ false,
+ null,
+ callback);
+
+ Assertions.assertTrue(callback.completed);
+ Assertions.assertNotNull(callback.result);
+ callback.result.close(CloseMode.IMMEDIATE);
+
+ Assertions.assertTrue(session.streamCancelCalled.get(), "IMMEDIATE close must cancel the CONNECT stream");
+ Assertions.assertTrue(session.isOpen(), "Cancelling tunnel stream must not close physical HTTP/2 connection");
+ }
+
+ @Test
+ void testTunnelWriteBufferIsBounded() throws Exception {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true);
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("http", "example.org", 80),
+ Timeout.ofSeconds(1),
+ false,
+ null,
+ callback);
+
+ Assertions.assertTrue(callback.completed);
+ Assertions.assertNotNull(callback.result);
+ final int payloadSize = 1024 * 1024;
+ final ByteBuffer src = ByteBuffer.allocate(payloadSize);
+ final int written = callback.result.write(src);
+ Assertions.assertTrue(written > 0);
+ Assertions.assertTrue(written < payloadSize, "Outbound writes must be bounded by tunnel buffer capacity");
+ }
+
+ @Test
+ void testCapacityUpdateIsNotUnbounded() {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, true, false, false);
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("http", "example.org", 80),
+ Timeout.ofSeconds(1),
+ false,
+ null,
+ callback);
+
+ Assertions.assertTrue(callback.completed);
+ Assertions.assertTrue(session.lastCapacityUpdate > 0, "Tunnel setup should publish initial bounded capacity");
+ Assertions.assertTrue(session.lastCapacityUpdate < Integer.MAX_VALUE, "Capacity must not be unbounded");
+ }
+
+ @Test
+ void testGracefulCloseEndsStreamAfterDrain() throws Exception {
+ final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true);
+ final RecordingCallback callback = new RecordingCallback<>();
+
+ H2OverH2TunnelSupport.establish(
+ session,
+ new HttpHost("http", "example.org", 80),
+ Timeout.ofSeconds(1),
+ false,
+ null,
+ callback);
+
+ Assertions.assertTrue(callback.completed);
+ Assertions.assertNotNull(callback.result);
+
+ final ByteBuffer src = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5});
+ final int written = callback.result.write(src);
+ Assertions.assertEquals(5, written);
+
+ callback.result.close(CloseMode.GRACEFUL);
+
+ // Drive output flush: test runs in same package -> can call package-private method.
+ ((H2TunnelProtocolIOSession) callback.result).onOutputReady();
+
+ Assertions.assertTrue(session.endStreamCalled.get(), "GRACEFUL close must end the CONNECT stream after draining");
+ Assertions.assertTrue(session.isOpen(), "Ending tunnel stream must not close physical HTTP/2 connection");
+ }
+
+ static class RecordingCallback implements FutureCallback {
+
+ volatile boolean completed;
+ volatile boolean cancelled;
+ volatile Exception failed;
+ volatile T result;
+
+ @Override
+ public void completed(final T result) {
+ this.completed = true;
+ this.result = result;
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ this.failed = ex;
+ }
+
+ @Override
+ public void cancelled() {
+ this.cancelled = true;
+ }
+ }
+
+ static class RecordingProtocolStarter implements IOEventHandlerFactory {
+
+ volatile boolean connectedCalled;
+ volatile boolean inputBufferSeen;
+
+ @Override
+ public IOEventHandler createHandler(final ProtocolIOSession ioSession, final Object attachment) {
+ return new IOEventHandler() {
+
+ @Override
+ public void connected(final IOSession session) {
+ connectedCalled = true;
+ }
+
+ @Override
+ public void inputReady(final IOSession session, final ByteBuffer src) {
+ if (src != null && src.hasRemaining()) {
+ inputBufferSeen = true;
+ }
+ }
+
+ @Override
+ public void outputReady(final IOSession session) {
+ }
+
+ @Override
+ public void timeout(final IOSession session, final Timeout timeout) {
+ }
+
+ @Override
+ public void exception(final IOSession session, final Exception cause) {
+ }
+
+ @Override
+ public void disconnected(final IOSession session) {
+ }
+ };
+ }
+ }
+
+ static class RecordingTlsStrategy implements TlsStrategy {
+
+ volatile boolean invoked;
+
+ @Override
+ public boolean upgrade(
+ final TransportSecurityLayer sessionLayer,
+ final HttpHost host,
+ final SocketAddress localAddress,
+ final SocketAddress remoteAddress,
+ final Object attachment,
+ final Timeout handshakeTimeout) {
+ invoked = true;
+ return true;
+ }
+
+ @Override
+ public void upgrade(
+ final TransportSecurityLayer sessionLayer,
+ final NamedEndpoint endpoint,
+ final Object attachment,
+ final Timeout handshakeTimeout,
+ final FutureCallback callback) {
+ invoked = true;
+ if (callback != null) {
+ callback.completed(sessionLayer);
+ }
+ }
+ }
+
+ static class ScriptedProxySession implements IOSession {
+
+ private final int responseCode;
+ private final boolean withTunnelStream;
+ private final boolean signalCapacity;
+ private final boolean emitInputData;
+ private final boolean provideStreamControl;
+
+ private final Lock lock;
+ private Timeout socketTimeout;
+
+ HttpRequest capturedRequest;
+ volatile int lastCapacityUpdate;
+
+ private final AtomicBoolean open;
+ final AtomicBoolean streamCancelCalled;
+ final AtomicBoolean endStreamCalled;
+
+ ScriptedProxySession(final int responseCode, final boolean withTunnelStream) {
+ this(responseCode, withTunnelStream, false, false, false);
+ }
+
+ ScriptedProxySession(final int responseCode, final boolean withTunnelStream, final boolean signalCapacity) {
+ this(responseCode, withTunnelStream, signalCapacity, false, false);
+ }
+
+ ScriptedProxySession(
+ final int responseCode,
+ final boolean withTunnelStream,
+ final boolean signalCapacity,
+ final boolean emitInputData,
+ final boolean provideStreamControl) {
+ this.responseCode = responseCode;
+ this.withTunnelStream = withTunnelStream;
+ this.signalCapacity = signalCapacity;
+ this.emitInputData = emitInputData;
+ this.provideStreamControl = provideStreamControl;
+
+ this.lock = new ReentrantLock();
+ this.socketTimeout = Timeout.ofSeconds(30);
+ this.lastCapacityUpdate = -1;
+ this.open = new AtomicBoolean(true);
+ this.streamCancelCalled = new AtomicBoolean(false);
+ this.endStreamCalled = new AtomicBoolean(false);
+ }
+
+ @Override
+ public IOEventHandler getHandler() {
+ return null;
+ }
+
+ @Override
+ public void upgrade(final IOEventHandler handler) {
+ }
+
+ @Override
+ public Lock getLock() {
+ return lock;
+ }
+
+ @Override
+ public void enqueue(final Command command, final Command.Priority priority) {
+ if (!(command instanceof RequestExecutionCommand)) {
+ return;
+ }
+
+ final RequestExecutionCommand requestExecutionCommand = (RequestExecutionCommand) command;
+ final AsyncClientExchangeHandler exchangeHandler = requestExecutionCommand.getExchangeHandler();
+ final org.apache.hc.core5.http.protocol.HttpContext context = requestExecutionCommand.getContext();
+
+ try {
+ if (provideStreamControl && exchangeHandler instanceof H2OverH2TunnelExchangeHandler) {
+ final StreamControl streamControl = (StreamControl) Proxy.newProxyInstance(
+ StreamControl.class.getClassLoader(),
+ new Class>[]{StreamControl.class},
+ (proxy, method, args) -> {
+ if ("cancel".equals(method.getName())) {
+ streamCancelCalled.set(true);
+ return method.getReturnType() == Boolean.TYPE ? Boolean.TRUE : null;
+ }
+ return defaultValue(method);
+ });
+
+ ((H2OverH2TunnelExchangeHandler) exchangeHandler).initiated(streamControl);
+ }
+
+ exchangeHandler.produceRequest((RequestChannel) (request, entityDetails, requestContext) -> capturedRequest = request, context);
+
+ if (signalCapacity) {
+ exchangeHandler.updateCapacity((CapacityChannel) increment -> lastCapacityUpdate = increment);
+ }
+
+ final EntityDetails responseEntityDetails =
+ withTunnelStream ? new org.apache.hc.core5.http.impl.BasicEntityDetails(-1, null) : null;
+ exchangeHandler.consumeResponse(new BasicHttpResponse(responseCode), responseEntityDetails, context);
+
+ if (withTunnelStream && responseCode == HttpStatus.SC_OK) {
+ exchangeHandler.produce(new DataStreamChannel() {
+
+ @Override
+ public void requestOutput() {
+ }
+
+ @Override
+ public int write(final ByteBuffer src) {
+ final int remaining = src != null ? src.remaining() : 0;
+ if (remaining > 0 && src != null) {
+ final byte[] tmp = new byte[remaining];
+ src.get(tmp);
+ }
+ return remaining;
+ }
+
+ @Override
+ public void endStream() throws IOException {
+
+ }
+
+ @Override
+ public void endStream(final java.util.List extends org.apache.hc.core5.http.Header> trailers) {
+ endStreamCalled.set(true);
+ }
+
+ });
+
+ if (emitInputData) {
+ exchangeHandler.consume(ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ }
+ }
+ } catch (final Exception ex) {
+ exchangeHandler.failed(ex);
+ }
+ }
+
+ private static Object defaultValue(final Method method) {
+ final Class> rt = method.getReturnType();
+ if (rt == Void.TYPE) {
+ return null;
+ }
+ if (rt == Boolean.TYPE) {
+ return false;
+ }
+ if (rt == Integer.TYPE) {
+ return 0;
+ }
+ if (rt == Long.TYPE) {
+ return 0L;
+ }
+ if (rt == Short.TYPE) {
+ return (short) 0;
+ }
+ if (rt == Byte.TYPE) {
+ return (byte) 0;
+ }
+ if (rt == Character.TYPE) {
+ return (char) 0;
+ }
+ if (rt == Float.TYPE) {
+ return 0f;
+ }
+ if (rt == Double.TYPE) {
+ return 0d;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean hasCommands() {
+ return false;
+ }
+
+ @Override
+ public Command poll() {
+ return null;
+ }
+
+ @Override
+ public ByteChannel channel() {
+ return this;
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress getLocalAddress() {
+ return null;
+ }
+
+ @Override
+ public int getEventMask() {
+ return 0;
+ }
+
+ @Override
+ public void setEventMask(final int ops) {
+ }
+
+ @Override
+ public void setEvent(final int op) {
+ }
+
+ @Override
+ public void clearEvent(final int op) {
+ }
+
+ @Override
+ public void close() {
+ open.set(false);
+ }
+
+ @Override
+ public void close(final CloseMode closeMode) {
+ open.set(false);
+ }
+
+ @Override
+ public Status getStatus() {
+ return open.get() ? Status.ACTIVE : Status.CLOSED;
+ }
+
+ @Override
+ public Timeout getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ @Override
+ public void setSocketTimeout(final Timeout timeout) {
+ this.socketTimeout = timeout;
+ }
+
+ @Override
+ public long getLastReadTime() {
+ return 0;
+ }
+
+ @Override
+ public long getLastWriteTime() {
+ return 0;
+ }
+
+ @Override
+ public long getLastEventTime() {
+ return 0;
+ }
+
+ @Override
+ public void updateReadTime() {
+ }
+
+ @Override
+ public void updateWriteTime() {
+ }
+
+ @Override
+ public int read(final ByteBuffer dst) {
+ return 0;
+ }
+
+ @Override
+ public int write(final ByteBuffer src) {
+ final int remaining = src != null ? src.remaining() : 0;
+ if (remaining > 0) {
+ final byte[] tmp = new byte[remaining];
+ src.get(tmp);
+ }
+ return remaining;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open.get();
+ }
+
+ @Override
+ public String getId() {
+ return "proxy-session";
+ }
+ }
+}
\ No newline at end of file
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2TunnelRawIOSession.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2TunnelRawIOSession.java
new file mode 100644
index 0000000000..fe5225fcd8
--- /dev/null
+++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2TunnelRawIOSession.java
@@ -0,0 +1,359 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.http2.nio.support;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.http.StreamControl;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestH2TunnelRawIOSession {
+
+ @Test
+ void testCapacityInitialUpdateIsBounded() throws Exception {
+ final DummyPhysicalSession physical = new DummyPhysicalSession();
+ final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null);
+
+ final AtomicInteger last = new AtomicInteger(0);
+ raw.updateCapacityChannel(new CapacityChannel() {
+ @Override
+ public void update(final int increment) {
+ last.set(increment);
+ }
+ });
+
+ Assertions.assertTrue(last.get() > 0);
+ Assertions.assertTrue(last.get() < Integer.MAX_VALUE);
+ }
+
+ @Test
+ void testAppendInputOverflowFails() throws Exception {
+ final DummyPhysicalSession physical = new DummyPhysicalSession();
+ final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null);
+
+ // INBOUND_BUFFER_LIMIT is 64k in the implementation; overflow by 1.
+ final ByteBuffer tooBig = ByteBuffer.allocate(64 * 1024 + 1);
+ Assertions.assertThrows(IOException.class, () -> raw.appendInput(tooBig));
+ }
+
+ @Test
+ void testReadTriggersCapacityUpdateOnConsumption() throws Exception {
+ final DummyPhysicalSession physical = new DummyPhysicalSession();
+ final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null);
+
+ final AtomicInteger last = new AtomicInteger(-1);
+ raw.updateCapacityChannel(new CapacityChannel() {
+ @Override
+ public void update(final int increment) {
+ last.set(increment);
+ }
+ });
+
+ raw.appendInput(ByteBuffer.wrap(new byte[]{1, 2, 3, 4}));
+
+ final ByteBuffer dst = ByteBuffer.allocate(4);
+ final int n = raw.read(dst);
+ Assertions.assertEquals(4, n);
+ // Capacity update should publish the consumed bytes (4), not unbounded.
+ Assertions.assertEquals(4, last.get());
+ }
+
+ @Test
+ void testWriteIsBounded() {
+ final DummyPhysicalSession physical = new DummyPhysicalSession();
+ final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null);
+
+ final ByteBuffer src = ByteBuffer.allocate(1024 * 1024);
+ final int n = raw.write(src);
+ Assertions.assertTrue(n > 0);
+ Assertions.assertTrue(n < 1024 * 1024);
+ Assertions.assertEquals(64 * 1024, n, "Expected OUTBOUND_BUFFER_LIMIT (64k) write bound");
+ }
+
+ @Test
+ void testImmediateCloseCancelsStreamControlButNotPhysicalSession() {
+ final DummyPhysicalSession physical = new DummyPhysicalSession();
+
+ final AtomicBoolean cancelled = new AtomicBoolean(false);
+ final StreamControl streamControl = (StreamControl) Proxy.newProxyInstance(
+ StreamControl.class.getClassLoader(),
+ new Class>[]{StreamControl.class},
+ (proxy, method, args) -> {
+ final String name = method.getName();
+ final Class> rt = method.getReturnType();
+
+ if ("cancel".equals(name)) {
+ cancelled.set(true);
+ // IMPORTANT: cancel() may return boolean (Cancellable)
+ return rt == Boolean.TYPE ? Boolean.TRUE : null;
+ }
+
+ if (rt == Void.TYPE) {
+ return null;
+ }
+ if (rt == Boolean.TYPE) {
+ return false;
+ }
+ if (rt == Integer.TYPE) {
+ return 0;
+ }
+ if (rt == Long.TYPE) {
+ return 0L;
+ }
+ if (rt == Short.TYPE) {
+ return (short) 0;
+ }
+ if (rt == Byte.TYPE) {
+ return (byte) 0;
+ }
+ if (rt == Character.TYPE) {
+ return (char) 0;
+ }
+ if (rt == Float.TYPE) {
+ return 0f;
+ }
+ if (rt == Double.TYPE) {
+ return 0d;
+ }
+ return null;
+ });
+
+ final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), streamControl);
+ raw.close(CloseMode.IMMEDIATE);
+
+ Assertions.assertTrue(cancelled.get(), "IMMEDIATE close must cancel stream");
+ Assertions.assertTrue(physical.isOpen(), "Tunnel close must not close physical HTTP/2 connection");
+ }
+
+ @Test
+ void testGracefulCloseEndsStreamAfterDrain() throws Exception {
+ final DummyPhysicalSession physical = new DummyPhysicalSession();
+
+ final AtomicBoolean endStreamCalled = new AtomicBoolean(false);
+ rawAttachChannel(physical, endStreamCalled);
+
+ final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null);
+ raw.attachChannel(physical.dataChannel);
+
+ // Put some outbound bytes
+ final ByteBuffer src = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5});
+ final int written = raw.write(src);
+ Assertions.assertEquals(5, written);
+
+ raw.close(CloseMode.GRACEFUL);
+ raw.flushOutput();
+
+ Assertions.assertTrue(endStreamCalled.get(), "GRACEFUL close must endStream once outbound drained");
+ }
+
+ private static void rawAttachChannel(final DummyPhysicalSession physical, final AtomicBoolean endStreamCalled) {
+ physical.dataChannel = new DataStreamChannel() {
+
+ @Override
+ public void requestOutput() {
+ }
+
+ @Override
+ public int write(final ByteBuffer src) {
+ final int remaining = src != null ? src.remaining() : 0;
+ if (remaining > 0 && src != null) {
+ final byte[] tmp = new byte[remaining];
+ src.get(tmp);
+ }
+ return remaining;
+ }
+
+ @Override
+ public void endStream() throws IOException {
+
+ }
+
+ @Override
+ public void endStream(final java.util.List extends org.apache.hc.core5.http.Header> trailers) {
+ endStreamCalled.set(true);
+ }
+ };
+ }
+
+ static final class DummyPhysicalSession implements IOSession {
+
+ private final Lock lock = new ReentrantLock();
+ private volatile boolean open = true;
+ private volatile Timeout socketTimeout = Timeout.ofSeconds(30);
+
+ volatile DataStreamChannel dataChannel;
+
+ @Override
+ public IOEventHandler getHandler() {
+ return null;
+ }
+
+ @Override
+ public void upgrade(final IOEventHandler handler) {
+ }
+
+ @Override
+ public Lock getLock() {
+ return lock;
+ }
+
+ @Override
+ public void enqueue(final Command command, final Command.Priority priority) {
+ }
+
+ @Override
+ public boolean hasCommands() {
+ return false;
+ }
+
+ @Override
+ public Command poll() {
+ return null;
+ }
+
+ @Override
+ public ByteChannel channel() {
+ return this;
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress getLocalAddress() {
+ return null;
+ }
+
+ @Override
+ public int getEventMask() {
+ return 0;
+ }
+
+ @Override
+ public void setEventMask(final int ops) {
+ }
+
+ @Override
+ public void setEvent(final int op) {
+ }
+
+ @Override
+ public void clearEvent(final int op) {
+ }
+
+ @Override
+ public void close() {
+ open = false;
+ }
+
+ @Override
+ public void close(final CloseMode closeMode) {
+ open = false;
+ }
+
+ @Override
+ public Status getStatus() {
+ return open ? Status.ACTIVE : Status.CLOSED;
+ }
+
+ @Override
+ public Timeout getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ @Override
+ public void setSocketTimeout(final Timeout timeout) {
+ socketTimeout = timeout;
+ }
+
+ @Override
+ public long getLastReadTime() {
+ return 0;
+ }
+
+ @Override
+ public long getLastWriteTime() {
+ return 0;
+ }
+
+ @Override
+ public long getLastEventTime() {
+ return 0;
+ }
+
+ @Override
+ public void updateReadTime() {
+ }
+
+ @Override
+ public void updateWriteTime() {
+ }
+
+ @Override
+ public int read(final ByteBuffer dst) {
+ return 0;
+ }
+
+ @Override
+ public int write(final ByteBuffer src) {
+ final int remaining = src != null ? src.remaining() : 0;
+ if (remaining > 0 && src != null) {
+ final byte[] tmp = new byte[remaining];
+ src.get(tmp);
+ }
+ return remaining;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public String getId() {
+ return "dummy-physical";
+ }
+ }
+}
\ No newline at end of file
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2OverH2TunnelContainerIT.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2OverH2TunnelContainerIT.java
new file mode 100644
index 0000000000..d5aed28f08
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2OverH2TunnelContainerIT.java
@@ -0,0 +1,323 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.testing.nio;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.config.CharCodingConfig;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.impl.nio.ClientH2PrefaceHandler;
+import org.apache.hc.core5.http2.impl.nio.ClientH2StreamMultiplexerFactory;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap;
+import org.apache.hc.core5.http2.nio.support.H2OverH2TunnelSupport;
+import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.ssl.SSLContexts;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+/**
+ * Full integration test for HTTP/2 CONNECT tunneling over a real proxy container.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+class H2OverH2TunnelContainerIT {
+
+ private static final String ORIGIN_BODY = "h2-tunnel-it-ok";
+ private static final int ORIGIN_PORT = 9443;
+ private static final int PROXY_PORT = 8080;
+ private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(90);
+ private static final String ORIGIN_ALIAS = "h2-origin";
+
+ private static Network network;
+ private static GenericContainer> originContainer;
+ private static GenericContainer> proxyContainer;
+
+ @BeforeAll
+ @SuppressWarnings("resource")
+ static void setUp() throws Exception {
+ network = Network.newNetwork();
+
+ originContainer = new GenericContainer<>(DockerImageName.parse("nginx:1.27.4-alpine"))
+ .withNetwork(network)
+ .withNetworkAliases(ORIGIN_ALIAS)
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("h2-tunnel-it/nginx-tls.conf"),
+ "/etc/nginx/nginx.conf")
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("h2-tunnel-it/certs/origin.crt"),
+ "/etc/nginx/certs/origin.crt")
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("h2-tunnel-it/certs/origin.key"),
+ "/etc/nginx/certs/origin.key")
+ .withExposedPorts(ORIGIN_PORT)
+ .waitingFor(Wait.forListeningPort())
+ .withStartupTimeout(STARTUP_TIMEOUT);
+ originContainer.start();
+
+ proxyContainer = new GenericContainer<>(DockerImageName.parse("envoyproxy/envoy:v1.31.2"))
+ .withNetwork(network)
+ .withCommand("envoy", "-c", "/etc/envoy/envoy.yaml", "-l", "info")
+ .withCopyFileToContainer(
+ MountableFile.forClasspathResource("h2-tunnel-it/envoy.yaml"),
+ "/etc/envoy/envoy.yaml")
+ .withExposedPorts(PROXY_PORT)
+ .waitingFor(Wait.forListeningPort())
+ .withStartupTimeout(STARTUP_TIMEOUT);
+ proxyContainer.start();
+ }
+
+ @AfterAll
+ static void tearDown() {
+ if (proxyContainer != null) {
+ proxyContainer.close();
+ }
+ if (originContainer != null) {
+ originContainer.close();
+ }
+ if (network != null) {
+ network.close();
+ }
+ }
+
+ @Test
+ void testHttp2TunnelOverHttp2ProxyAndConnectionReuse() throws Exception {
+ final Timeout timeout = Timeout.ofSeconds(30);
+ final HttpHost proxy = new HttpHost(
+ "http",
+ proxyContainer.getHost(),
+ proxyContainer.getMappedPort(PROXY_PORT));
+ final HttpHost target = new HttpHost("https", ORIGIN_ALIAS, ORIGIN_PORT);
+
+ final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap()
+ .setH2Config(H2Config.custom().setPushEnabled(false).build())
+ .create();
+ requester.start();
+ try {
+ final IOSession proxySession = awaitProxySession(requester, proxy, timeout);
+
+ final IOSession tunnelA = awaitTunnelSession(
+ proxySession,
+ target,
+ timeout,
+ true,
+ createTunnelTlsStrategy());
+ assertOriginResponse(executeGet(tunnelA, target, "/a"));
+
+ final IOSession tunnelB = awaitTunnelSession(
+ proxySession,
+ target,
+ timeout,
+ true,
+ createTunnelTlsStrategy());
+ assertOriginResponse(executeGet(tunnelB, target, "/b"));
+
+ tunnelA.close(CloseMode.IMMEDIATE);
+ Assertions.assertTrue(proxySession.isOpen(),
+ "Closing one tunnel must not close the shared proxy HTTP/2 connection");
+
+ assertOriginResponse(executeGet(tunnelB, target, "/still-open"));
+ tunnelB.close(CloseMode.GRACEFUL);
+ } finally {
+ requester.close(CloseMode.GRACEFUL);
+ }
+ }
+
+ private static IOEventHandlerFactory tunnelProtocolStarter() {
+ return (ioSession, attachment) -> new ClientH2PrefaceHandler(
+ ioSession,
+ new ClientH2StreamMultiplexerFactory(
+ HttpProcessorBuilder.create().build(),
+ null,
+ H2Config.DEFAULT,
+ CharCodingConfig.DEFAULT,
+ null),
+ false,
+ null);
+ }
+
+ private static IOSession awaitProxySession(
+ final H2MultiplexingRequester requester,
+ final HttpHost proxy,
+ final Timeout timeout) throws Exception {
+
+ final CompletableFuture resultFuture = new CompletableFuture<>();
+ requester.getConnPool().getSession(proxy, timeout, new FutureCallback() {
+
+ @Override
+ public void completed(final IOSession result) {
+ resultFuture.complete(result);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ resultFuture.completeExceptionally(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ resultFuture.cancel(false);
+ }
+
+ });
+ return resultFuture.get(1, TimeUnit.MINUTES);
+ }
+
+ private static IOSession awaitTunnelSession(
+ final IOSession proxySession,
+ final HttpHost target,
+ final Timeout timeout,
+ final boolean secure,
+ final TlsStrategy tlsStrategy) throws Exception {
+
+ final CompletableFuture resultFuture = new CompletableFuture<>();
+ H2OverH2TunnelSupport.establish(
+ proxySession,
+ target,
+ timeout,
+ secure,
+ tlsStrategy,
+ tunnelProtocolStarter(),
+ new FutureCallback() {
+
+ @Override
+ public void completed(final IOSession result) {
+ resultFuture.complete(result);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ resultFuture.completeExceptionally(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ resultFuture.cancel(false);
+ }
+
+ });
+ try {
+ return resultFuture.get(1, TimeUnit.MINUTES);
+ } catch (final Exception ex) {
+ final String proxyLogs = proxyContainer != null ? proxyContainer.getLogs() : "";
+ final String originLogs = originContainer != null ? originContainer.getLogs() : "";
+ throw new IllegalStateException(
+ "Tunnel establishment failed. Proxy logs:\n" + proxyLogs + "\nOrigin logs:\n" + originLogs,
+ ex);
+ }
+ }
+
+ private static Message executeGet(
+ final IOSession tunnelSession,
+ final HttpHost target,
+ final String requestUri) throws Exception {
+
+ final CompletableFuture> responseFuture = new CompletableFuture<>();
+
+ final AsyncClientExchangeHandler exchangeHandler = new org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler<>(
+ new BasicRequestProducer(Method.GET, target, requestUri),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ new FutureCallback>() {
+
+ @Override
+ public void completed(final Message message) {
+ responseFuture.complete(message);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ responseFuture.completeExceptionally(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ responseFuture.cancel(false);
+ }
+
+ });
+
+ tunnelSession.enqueue(
+ new RequestExecutionCommand(exchangeHandler, HttpCoreContext.create()),
+ Command.Priority.NORMAL);
+
+ try {
+ return responseFuture.get(1, TimeUnit.MINUTES);
+ } catch (final Exception ex) {
+ final String proxyLogs = proxyContainer != null ? proxyContainer.getLogs() : "";
+ final String originLogs = originContainer != null ? originContainer.getLogs() : "";
+ throw new IllegalStateException(
+ "Tunnel request failed. Proxy logs:\n" + proxyLogs + "\nOrigin logs:\n" + originLogs,
+ ex);
+ }
+ }
+
+ private static TlsStrategy createTunnelTlsStrategy() throws Exception {
+ final SSLContext sslContext = SSLContexts.custom()
+ .loadTrustMaterial(null, (chain, authType) -> true)
+ .build();
+ return new H2ClientTlsStrategy(sslContext);
+ }
+
+ private static void assertOriginResponse(final Message message) {
+ Assertions.assertNotNull(message);
+ Assertions.assertNotNull(message.getHead());
+ Assertions.assertEquals(HttpStatus.SC_OK, message.getHead().getCode());
+ Assertions.assertNotNull(message.getBody());
+ Assertions.assertEquals(ORIGIN_BODY, message.getBody().trim());
+ }
+
+}
\ No newline at end of file
diff --git a/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.crt b/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.crt
new file mode 100644
index 0000000000..6477bf6bfd
--- /dev/null
+++ b/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDHzCCAgegAwIBAgIUN0piqpG3WxzuNDapinZ4C5iLoWcwDQYJKoZIhvcNAQEL
+BQAwFDESMBAGA1UEAwwJaDItb3JpZ2luMB4XDTI2MDMwMzEzNDcwNFoXDTM2MDIy
+OTEzNDcwNFowFDESMBAGA1UEAwwJaDItb3JpZ2luMIIBIjANBgkqhkiG9w0BAQEF
+AAOCAQ8AMIIBCgKCAQEAumfV8zdii2PnkhxIth3gvW9bTagDTWBGn3xjLLlbqh+i
+5WqdEXfV2TzE8Ps36WweJTOQxOJQUjcuprxLHu+eCdD8WOHby4eCNez0IY8dB2At
+IQE6IzQJV/3+ir+dRL4vBJPWplRNJmuDVbsbatyaIHWWRcXeZimxgTuUs0Lkp7QR
+l32C6uEnycDH+ftKJbLNZa0iBmDYar7CJ5Sdf64S9kwhU/eoCDB6woZCGzL55gHe
+jHdqNNKYB+INfLcpdcXzPGWrCJNYlCfRMw05UHXCydCoAA85tdd219X5pvtxwCo7
+aFYueSFFpiBcBqCZQYOEXdSoCR/8Hjvqh4zcHCychQIDAQABo2kwZzAdBgNVHQ4E
+FgQUtZmPTJvO/Z7jdiBnal8cxGVZhhowHwYDVR0jBBgwFoAUtZmPTJvO/Z7jdiBn
+al8cxGVZhhowDwYDVR0TAQH/BAUwAwEB/zAUBgNVHREEDTALggloMi1vcmlnaW4w
+DQYJKoZIhvcNAQELBQADggEBAB5woEcorqa5b61ijwK6q98/61EUqEVMiGqu93Ta
+i75abG4Nk6Jdhh6QskLVOb1gqlmwzDQ8IobO3dodm4SrmzusZg7tIHoPcZAGf35a
+e+J5nH8ryAAKwvhRV/Tdu1pQi5EGc+WW/18zJZGyxjDincbUTl1uwR8uNmTsjawr
+zz09NF9nUTR04dFu39Wv9kOHDptuloz82Jw7VdjtUhuDfheNOTOxERxU5JKhJhcJ
+XINkOL8NV9mqXFadQKkViaTjcAdbtXmNd0BxDxlCU3nLLL7jk1AaViSuumQjYo7R
+5dpxYuk/45FF0Jb4vDC+r40Jue7r2O5+cZp7IO3E+YEybMI=
+-----END CERTIFICATE-----
diff --git a/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.key b/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.key
new file mode 100644
index 0000000000..a381c78a77
--- /dev/null
+++ b/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC6Z9XzN2KLY+eS
+HEi2HeC9b1tNqANNYEaffGMsuVuqH6Llap0Rd9XZPMTw+zfpbB4lM5DE4lBSNy6m
+vEse754J0PxY4dvLh4I17PQhjx0HYC0hATojNAlX/f6Kv51Evi8Ek9amVE0ma4NV
+uxtq3JogdZZFxd5mKbGBO5SzQuSntBGXfYLq4SfJwMf5+0olss1lrSIGYNhqvsIn
+lJ1/rhL2TCFT96gIMHrChkIbMvnmAd6Md2o00pgH4g18tyl1xfM8ZasIk1iUJ9Ez
+DTlQdcLJ0KgADzm113bX1fmm+3HAKjtoVi55IUWmIFwGoJlBg4Rd1KgJH/weO+qH
+jNwcLJyFAgMBAAECggEAAu2dpUfx8tmbaiaql73JaYBl0Ub54k3IXjoAftPclkQP
+9YWiuQMGZ3a2a0iu/Ko3oQL527XoaBo4z+K2VWKTO3k+dZD6uGxFBd7WiO5sGNEQ
+dGvGA4aOPQUe6gQPjuRj7bD61rsNSTS4J/EcAaY8f5UJSshMcZNnF+4dLGG5IM9C
+9RaP2npSeGE55y0cmUw0FqlxnuQYocGdZHZUglngRan22VI+gY/8qrZ4Q4BNWdPn
+EGE28lYCxRkzRocybV5dUsgaXIYOx+Wl1+XprcZWZqVlauQ5UMMBVJg9mxaNGStn
+C5Fqa86lAtNHIJcMiSey/k6qpBGdIBhIjW1V1EMcAwKBgQDogkLm59E8h8H66Hq/
+M0Uh0n8x/8nxLoDBzn7ZNF6VDwRvD+R8LuGvS4UwmBUJmAj8fbCFf3IOTRCuup+d
+cEzYD7dMUiVn+wJjIoT8WsKxMZ0VRBOSuOOraGYgPgIIcLXhpMH4EC3oLChedWhI
+kQSL84aX0D1gyMOiNpffs2ZGXwKBgQDNPSKA5czqtdL3PcK+UZd5YNEdkoiZcKYn
+niiMTLTqj1GD8X7C9efqP+Qa2CTQUaOeOfmqAti9rGcY83VtytcNtdZ+YsYj1Zbi
+rUBa4I8X0LLfn9lI/xxN1b6gQoQ+fDbAt+bFldF3t9B28sbkmBGFaAb1TtQizdia
+co35plWfmwKBgQDmXbG1oDeSdpu+YrrDWCQF186IlnvaB44w98x8nkOcAk4NUDy8
+waKAER48wGIPqGA28r2D93rlKnv98xAUaGDqrd+ZscY4GN4LpPcIJVDDSXnuyQ1v
+kNqaSQzuoyFWhX3fvGMmybkCUUYKGN+jDnPnyfgv0HYPv7r9rIObc99AlwKBgEeI
+eVAnyCY+PUuDMS8YTQ03G2uNOSMRyjegvk04Jw5h6W1tbFsTTkOtBRn+H8ajzb1G
+Q6hn2ZcyUbS2lkUwH4hdyma+koTG2xIihH2oKveH+/BJTHhOwlS2nPxKcsE8lfDR
+qBNRxnJNlNEAiSX/govW2CYD1ZhT2pzqNGXA/bLlAoGBAL7EPoMbfPIZitrCrApA
+wfI2XUUCjJFF+s+eKHYnrNB9GDsPUxckg7jIhf5ERSWtzbcSD7wXIYcVYBBdtZ9o
+/p5XmsHfRNQn6PzVtc/R9JzEkAk2a2m66ec+QbHs561f36WXNPDhbASuWY7O6NYe
+086+jvxYtE6/W9zuoEEygTxo
+-----END PRIVATE KEY-----
diff --git a/httpcore5-testing/src/test/resources/h2-tunnel-it/envoy.yaml b/httpcore5-testing/src/test/resources/h2-tunnel-it/envoy.yaml
new file mode 100644
index 0000000000..4d1018d46c
--- /dev/null
+++ b/httpcore5-testing/src/test/resources/h2-tunnel-it/envoy.yaml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+static_resources:
+ listeners:
+ - name: h2_connect_proxy_listener
+ address:
+ socket_address:
+ address: 0.0.0.0
+ port_value: 8080
+ filter_chains:
+ - filters:
+ - name: envoy.filters.network.http_connection_manager
+ typed_config:
+ "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
+ stat_prefix: h2_connect_proxy
+ codec_type: HTTP2
+ http2_protocol_options:
+ allow_connect: true
+ route_config:
+ name: local_route
+ virtual_hosts:
+ - name: any
+ domains: ["*"]
+ routes:
+ - match:
+ connect_matcher: {}
+ route:
+ cluster: dynamic_forward_proxy_cluster
+ timeout: 0s
+ upgrade_configs:
+ - upgrade_type: CONNECT
+ connect_config: {}
+ access_log:
+ - name: envoy.access_loggers.stdout
+ typed_config:
+ "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
+ log_format:
+ text_format_source:
+ inline_string: "[%START_TIME%] \"%REQ(:METHOD)% %REQ(:AUTHORITY)% %PROTOCOL%\" %RESPONSE_CODE% %RESPONSE_FLAGS% upstream=%UPSTREAM_HOST%\n"
+ http_filters:
+ - name: envoy.filters.http.dynamic_forward_proxy
+ typed_config:
+ "@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.FilterConfig
+ dns_cache_config:
+ name: local_dns_cache
+ dns_lookup_family: V4_ONLY
+ - name: envoy.filters.http.router
+ typed_config:
+ "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
+
+ clusters:
+ - name: dynamic_forward_proxy_cluster
+ connect_timeout: 5s
+ lb_policy: CLUSTER_PROVIDED
+ cluster_type:
+ name: envoy.clusters.dynamic_forward_proxy
+ typed_config:
+ "@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig
+ dns_cache_config:
+ name: local_dns_cache
+ dns_lookup_family: V4_ONLY
\ No newline at end of file
diff --git a/httpcore5-testing/src/test/resources/h2-tunnel-it/nginx-tls.conf b/httpcore5-testing/src/test/resources/h2-tunnel-it/nginx-tls.conf
new file mode 100644
index 0000000000..4bd8f2c374
--- /dev/null
+++ b/httpcore5-testing/src/test/resources/h2-tunnel-it/nginx-tls.conf
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+worker_processes 1;
+
+events {
+ worker_connections 1024;
+}
+
+http {
+ server {
+ listen 9443 ssl http2;
+ server_name h2-origin;
+
+ ssl_certificate /etc/nginx/certs/origin.crt;
+ ssl_certificate_key /etc/nginx/certs/origin.key;
+ ssl_protocols TLSv1.2 TLSv1.3;
+
+ location / {
+ default_type text/plain;
+ return 200 "h2-tunnel-it-ok";
+ }
+ }
+}
\ No newline at end of file