Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ PROXY_METRICS_PORT=9090
PROXY_HEALTH_PORT=8081
PROXY_SHARED_HEALTH_PORT=true
PROXY_SHARED_METRICS_PORT=true

# Proxy Configuration (optional)
# PROXY_URL=http://proxy.example.com:8080
# PROXY_URL=socks5://proxy.example.com:1080
# PROXY_AUTH=username:password
4 changes: 2 additions & 2 deletions .github/workflows/docker-build-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ jobs:
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/amd64,linux/arm64
platforms: linux/amd64,linux/arm64,linux/arm64/v8,linux/386,linux/s390x,linux/riscv64,linux/ppc64le,linux/arm/v7
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
cache-to: type=gha,mode=max
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ variables take precedence when both are provided.
| `PROXY_HEADER_QUEUES` | `-header-queues` | X-Amz-Security-Token | Comma-separated headers for dedicated queues |
| `PROXY_PERSISTENT_HEADERS` | `-persistent-headers` | (empty) | Persistent headers that cannot be overwritten 🔒 |
| `PROXY_TIMEOUT` | `-timeout` | 0 | Request timeout in seconds (0 = infinite ⏳) |
| `PROXY_URL` | `-proxy-url` | (empty) | Proxy URL for TARGET_HOST connections 🌐 |
| `PROXY_AUTH` | `-proxy-auth` | (empty) | Proxy authentication (username:password) 🔐 |

### Persistent Headers 🔒

Expand Down Expand Up @@ -183,6 +185,64 @@ export PROXY_PERSISTENT_HEADERS="User-Agent:mobile,X-Custom:always-present"
./proxy-queue -target-host=api.example.com
```

### Proxy Configuration 🌐

The proxy-queue can route all TARGET_HOST connections through an upstream proxy server. This is useful when your network requires proxy access to reach external services, or when you need to add an additional layer of routing.

#### Supported Proxy Types

| Proxy Type | URL Format | Features |
|---------------|------------------------------------|----------------------------------------------------|
| **HTTP** | `http://proxy.example.com:8080` | ✅ HTTP requests<br>❌ Socket connections |
| **HTTPS** | `https://proxy.example.com:8080` | ✅ HTTP requests<br>❌ Socket connections |
| **SOCKS5** | `socks5://proxy.example.com:1080` | ✅ HTTP requests<br>✅ Socket connections |

#### Configuration Methods

```bash
# Command line flags
./proxy-queue \
-target-host=api.example.com \
-proxy-url=socks5://proxy.company.com:1080 \
-proxy-auth=username:password

# Environment variables
export PROXY_URL=http://corporate-proxy:8080
export PROXY_AUTH=myuser:mypass
./proxy-queue -target-host=api.example.com

# Docker environment
docker run -e PROXY_URL=socks5://proxy:1080 -e PROXY_AUTH=user:pass proxy-queue
```

#### Authentication Support

The proxy supports username/password authentication for all proxy types:

```bash
# Include auth in URL (HTTP/HTTPS only)
export PROXY_URL=http://username:password@proxy.example.com:8080

# Separate auth parameter (recommended for security)
export PROXY_URL=socks5://proxy.example.com:1080
export PROXY_AUTH=username:password
```

#### Security Features

- **No credential logging**: Proxy authentication is never logged in plaintext
- **Fallback behavior**: If proxy connection fails, falls back to direct connection
- **Connection validation**: Invalid proxy URLs are detected and logged
- **Debug visibility**: Proxy usage is logged at debug level for troubleshooting

#### Proxy vs Direct Connection

| Connection Type | Proxy Required | Fallback Behavior |
|-----------------|----------------|--------------------------------------|
| **HTTP/HTTPS** | Optional | Falls back to direct connection |
| **Socket** | Optional | Falls back to direct connection |
| **Both** | Optional | Each connection type handles its own |

### Header-Based Queue Routing 📤

The proxy supports routing requests to dedicated queues based on specific HTTP headers. This is particularly
Expand Down
Binary file modified build/proxy-queue-darwin-amd64
Binary file not shown.
Binary file modified build/proxy-queue-darwin-arm64
Binary file not shown.
Binary file modified build/proxy-queue-linux-amd64
Binary file not shown.
Binary file modified build/proxy-queue-linux-arm64
Binary file not shown.
Binary file modified build/proxy-queue-windows-amd64.exe
Binary file not shown.
Binary file modified build/proxy-queue-windows-arm64.exe
Binary file not shown.
8 changes: 6 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.8'

services:
proxy-queue:
image: thanhlvcom/proxy-queue:latest
image: thanhlvcom/proxy-queue:v1.2.0
ports:
- "6789:6789" # HTTP/HTTPS proxy
- "6799:6799" # Socket proxy
Expand All @@ -23,9 +23,13 @@ services:
- PROXY_HEADER_QUEUES=X-Amz-Security-Token,Authorization
- PROXY_PERSISTENT_HEADERS=x-token-test:token-1234
- PROXY_LOG_LEVEL=info
# Optional proxy configuration (uncomment and configure as needed)
# - PROXY_URL=http://corporate-proxy:8080
# - PROXY_URL=socks5://proxy.company.com:1080
# - PROXY_AUTH=username:password
restart: always
healthcheck:
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:6789/health"]
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8081/health"]
interval: 30s
timeout: 10s
retries: 3
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ toolchain go1.24.7
require (
github.com/prometheus/client_golang v1.23.2
github.com/sirupsen/logrus v1.9.3
golang.org/x/net v0.43.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
Expand Down
162 changes: 155 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"golang.org/x/net/proxy"
)

type Config struct {
Expand All @@ -39,6 +40,8 @@ type Config struct {
HeaderQueues []string // Headers to create separate queues for (e.g., ["X-Amz-Security-Token", "X-Amz-Content-Sha256"])
PersistentHeaders map[string]string // Headers that are always added and cannot be overwritten by clients (e.g., {"User-Agent": "mobile"})
Timeout time.Duration // Request timeout (0 = infinite)
ProxyURL string // Proxy URL (e.g., "http://proxy:8080", "socks5://proxy:1080")
ProxyAuth string // Proxy authentication in format "username:password"
}

type ProxyRequest struct {
Expand Down Expand Up @@ -612,6 +615,8 @@ func main() {
headerQueues = flag.String("header-queues", "X-Amz-Security-Token", "Comma-separated list of headers to create separate queues for (e.g., 'X-Amz-Security-Token,Authorization')")
persistentHeaders = flag.String("persistent-headers", "", "Comma-separated list of headers to always add (format: 'key1:value1,key2:value2')")
timeout = flag.Int("timeout", 0, "Request timeout in seconds (0 = infinite ⏳)")
proxyURL = flag.String("proxy-url", "", "Proxy URL for TARGET_HOST connections (e.g., 'http://proxy:8080', 'socks5://proxy:1080')")
proxyAuth = flag.String("proxy-auth", "", "Proxy authentication in format 'username:password'")
)
flag.Parse()

Expand All @@ -637,6 +642,8 @@ func main() {
HeaderQueues: headerQueuesList,
PersistentHeaders: persistentHeadersMap,
Timeout: time.Duration(getIntFromEnvOrFlag("PROXY_TIMEOUT", timeout, "timeout", 0)) * time.Second,
ProxyURL: getStringFromEnvOrFlag("PROXY_URL", proxyURL, "proxy-url", ""),
ProxyAuth: getStringFromEnvOrFlag("PROXY_AUTH", proxyAuth, "proxy-auth", ""),
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -662,6 +669,8 @@ func main() {
"header_queues": config.HeaderQueues,
"persistent_headers": config.PersistentHeaders,
"timeout": config.Timeout,
"proxy_url": config.ProxyURL,
"proxy_auth_set": config.ProxyAuth != "",
},
"timestamp": time.Now().UTC().Format(time.RFC3339Nano),
}).Debug("🚀 Proxy Queue Manager Configuration Loaded")
Expand Down Expand Up @@ -713,6 +722,8 @@ func main() {
"header_queues": config.HeaderQueues,
"persistent_headers": config.PersistentHeaders,
"timeout": config.Timeout,
"proxy_url": config.ProxyURL,
"proxy_auth_set": config.ProxyAuth != "",
"timestamp": time.Now().UTC().Format(time.RFC3339Nano),
}).Info("🎯 Proxy server started and ready to accept connections")

Expand Down Expand Up @@ -798,6 +809,115 @@ func startHealthServer(queueManager *QueueManager, config *Config) {
}
}

func (pq *ProxyQueue) createDialerWithProxy() func(network, address string) (net.Conn, error) {
if pq.config.ProxyURL == "" {
return net.Dial
}

proxyURL, err := url.Parse(pq.config.ProxyURL)
if err != nil {
pq.logger.WithFields(logrus.Fields{
"proxy_url": pq.config.ProxyURL,
"error": err,
}).Error("Failed to parse proxy URL for socket connection, using direct connection")
return net.Dial
}

switch proxyURL.Scheme {
case "http", "https":
pq.logger.Warn("HTTP proxy not supported for socket connections, using direct connection")
return net.Dial

case "socks5":
var auth *proxy.Auth
if pq.config.ProxyAuth != "" {
parts := strings.SplitN(pq.config.ProxyAuth, ":", 2)
if len(parts) == 2 {
auth = &proxy.Auth{
User: parts[0],
Password: parts[1],
}
}
}

dialer, err := proxy.SOCKS5("tcp", proxyURL.Host, auth, proxy.Direct)
if err != nil {
pq.logger.WithFields(logrus.Fields{
"proxy_url": pq.config.ProxyURL,
"error": err,
}).Error("Failed to create SOCKS5 proxy dialer for socket connection, using direct connection")
return net.Dial
}

pq.logger.WithField("proxy_url", pq.config.ProxyURL).Debug("🧦 Using SOCKS5 proxy for socket connection")
return dialer.Dial

default:
pq.logger.WithField("proxy_scheme", proxyURL.Scheme).Warn("Unsupported proxy scheme for socket connection, using direct connection")
return net.Dial
}
}

func (pq *ProxyQueue) createTransportWithProxy() *http.Transport {
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}

if pq.config.ProxyURL != "" {
proxyURL, err := url.Parse(pq.config.ProxyURL)
if err != nil {
pq.logger.WithFields(logrus.Fields{
"proxy_url": pq.config.ProxyURL,
"error": err,
}).Error("Failed to parse proxy URL, using direct connection")
return transport
}

switch proxyURL.Scheme {
case "http", "https":
if pq.config.ProxyAuth != "" {
parts := strings.SplitN(pq.config.ProxyAuth, ":", 2)
if len(parts) == 2 {
proxyURL.User = url.UserPassword(parts[0], parts[1])
}
}
transport.Proxy = http.ProxyURL(proxyURL)
pq.logger.WithField("proxy_url", pq.config.ProxyURL).Debug("🌐 Using HTTP(S) proxy")

case "socks5":
var auth *proxy.Auth
if pq.config.ProxyAuth != "" {
parts := strings.SplitN(pq.config.ProxyAuth, ":", 2)
if len(parts) == 2 {
auth = &proxy.Auth{
User: parts[0],
Password: parts[1],
}
}
}

dialer, err := proxy.SOCKS5("tcp", proxyURL.Host, auth, proxy.Direct)
if err != nil {
pq.logger.WithFields(logrus.Fields{
"proxy_url": pq.config.ProxyURL,
"error": err,
}).Error("Failed to create SOCKS5 proxy dialer, using direct connection")
return transport
}

transport.Dial = dialer.Dial
pq.logger.WithField("proxy_url", pq.config.ProxyURL).Debug("🧦 Using SOCKS5 proxy")

default:
pq.logger.WithField("proxy_scheme", proxyURL.Scheme).Warn("Unsupported proxy scheme, using direct connection")
}
}

return transport
}

func (pq *ProxyQueue) getHealthStatus() map[string]interface{} {
pq.mu.RLock()
running := pq.running
Expand Down Expand Up @@ -886,12 +1006,8 @@ func (pq *ProxyQueue) processHTTPRequest(req ProxyRequest) {
}

client := &http.Client{
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
Timeout: timeout,
Transport: pq.createTransportWithProxy(),
}

// Forward the request
Expand Down Expand Up @@ -1176,7 +1292,39 @@ func (pq *ProxyQueue) processSocketRequest(req ProxyRequest) {

connectionStartTime := time.Now()

targetConn, err := net.DialTimeout("tcp", targetAddr, socketTimeout)
// Use proxy-aware dialer
dialer := pq.createDialerWithProxy()

// Create a context with timeout for the connection
ctx, cancel := context.WithTimeout(context.Background(), socketTimeout)
defer cancel()

// Channel to handle the connection result
connChan := make(chan net.Conn, 1)
errChan := make(chan error, 1)

go func() {
conn, err := dialer("tcp", targetAddr)
if err != nil {
errChan <- err
} else {
connChan <- conn
}
}()

var targetConn net.Conn
var err error

select {
case targetConn = <-connChan:
// Connection successful
case err = <-errChan:
// Connection failed
case <-ctx.Done():
// Timeout
err = fmt.Errorf("connection timeout after %v", socketTimeout)
}

connectionTime := time.Since(connectionStartTime)
if err != nil {
pq.logger.WithFields(logrus.Fields{
Expand Down
Loading