Skip to content

Commit 9dd8aa6

Browse files
committed
Added s3 backend for container logging
1 parent 796a1b6 commit 9dd8aa6

8 files changed

Lines changed: 331 additions & 42 deletions

File tree

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,14 @@
232232
<artifactId>jquery</artifactId>
233233
<version>3.3.1</version>
234234
</dependency>
235+
236+
<!-- Amazon S3 -->
237+
<dependency>
238+
<groupId>com.amazonaws</groupId>
239+
<artifactId>aws-java-sdk-s3</artifactId>
240+
<version>1.11.563</version>
241+
</dependency>
242+
235243
</dependencies>
236244

237245
<build>
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package eu.openanalytics.containerproxy.log;
2+
3+
import java.io.IOException;
4+
import java.text.SimpleDateFormat;
5+
import java.util.Date;
6+
7+
import javax.inject.Inject;
8+
9+
import org.springframework.core.env.Environment;
10+
11+
import eu.openanalytics.containerproxy.model.runtime.Proxy;
12+
13+
public abstract class AbstractLogStorage implements ILogStorage {
14+
15+
private static final String PARAM_LOG_PATHS = "log_paths";
16+
17+
@Inject
18+
protected Environment environment;
19+
20+
protected String containerLogPath;
21+
22+
@Override
23+
public void initialize() throws IOException {
24+
containerLogPath = environment.getProperty("proxy.container-log-path");
25+
}
26+
27+
@Override
28+
public String getStorageLocation() {
29+
return containerLogPath;
30+
}
31+
32+
@Override
33+
public String[] getLogs(Proxy proxy) throws IOException {
34+
String[] paths = (String[]) proxy.getContainers().get(0).getParameters().get(PARAM_LOG_PATHS);
35+
if (paths == null) {
36+
String timestamp = new SimpleDateFormat("yyyyMMdd").format(new Date());
37+
paths = new String[] {
38+
String.format("%s/%s_%s_%s_stdout.log", containerLogPath, proxy.getSpec().getId(), proxy.getId(), timestamp),
39+
String.format("%s/%s_%s_%s_stderr.log", containerLogPath, proxy.getSpec().getId(), proxy.getId(), timestamp)
40+
};
41+
proxy.getContainers().get(0).getParameters().put(PARAM_LOG_PATHS, paths);
42+
}
43+
return paths;
44+
}
45+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package eu.openanalytics.containerproxy.log;
2+
3+
import java.io.FileOutputStream;
4+
import java.io.IOException;
5+
import java.io.OutputStream;
6+
import java.nio.file.Files;
7+
import java.nio.file.Paths;
8+
9+
import eu.openanalytics.containerproxy.model.runtime.Proxy;
10+
11+
public class FileLogStorage extends AbstractLogStorage {
12+
13+
@Override
14+
public void initialize() throws IOException {
15+
super.initialize();
16+
Files.createDirectories(Paths.get(containerLogPath));
17+
}
18+
19+
@Override
20+
public OutputStream[] createOutputStreams(Proxy proxy) throws IOException {
21+
String[] paths = getLogs(proxy);
22+
return new OutputStream[] {
23+
new FileOutputStream(paths[0]),
24+
new FileOutputStream(paths[1])
25+
};
26+
}
27+
28+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package eu.openanalytics.containerproxy.log;
2+
3+
import java.io.IOException;
4+
import java.io.OutputStream;
5+
6+
import eu.openanalytics.containerproxy.model.runtime.Proxy;
7+
8+
public interface ILogStorage {
9+
10+
public void initialize() throws IOException;
11+
12+
public String getStorageLocation();
13+
14+
public OutputStream[] createOutputStreams(Proxy proxy) throws IOException;
15+
16+
public String[] getLogs(Proxy proxy) throws IOException;
17+
18+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package eu.openanalytics.containerproxy.log;
2+
3+
import javax.inject.Inject;
4+
5+
import org.springframework.beans.factory.config.AbstractFactoryBean;
6+
import org.springframework.context.ApplicationContext;
7+
import org.springframework.context.annotation.Primary;
8+
import org.springframework.core.env.Environment;
9+
import org.springframework.stereotype.Service;
10+
11+
@Service(value="logStorage")
12+
@Primary
13+
public class LogStorageFactory extends AbstractFactoryBean<ILogStorage> {
14+
15+
@Inject
16+
private Environment environment;
17+
18+
@Inject
19+
private ApplicationContext applicationContext;
20+
21+
@Override
22+
public Class<?> getObjectType() {
23+
return ILogStorage.class;
24+
}
25+
26+
@Override
27+
protected ILogStorage createInstance() throws Exception {
28+
ILogStorage storage = null;
29+
30+
String containerLogPath = environment.getProperty("proxy.container-log-path");
31+
if (containerLogPath == null || containerLogPath.trim().isEmpty()) {
32+
storage = new NoopLogStorage();
33+
} else if (containerLogPath.toLowerCase().startsWith("s3://")) {
34+
storage = new S3LogStorage();
35+
} else {
36+
storage = new FileLogStorage();
37+
}
38+
39+
applicationContext.getAutowireCapableBeanFactory().autowireBean(storage);
40+
return storage;
41+
}
42+
43+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package eu.openanalytics.containerproxy.log;
2+
3+
import java.io.OutputStream;
4+
5+
import eu.openanalytics.containerproxy.model.runtime.Proxy;
6+
7+
public class NoopLogStorage extends AbstractLogStorage {
8+
9+
@Override
10+
public void initialize() {
11+
// Do nothing.
12+
}
13+
14+
@Override
15+
public OutputStream[] createOutputStreams(Proxy proxy) {
16+
return null;
17+
}
18+
19+
@Override
20+
public String[] getLogs(Proxy proxy) {
21+
return null;
22+
}
23+
24+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package eu.openanalytics.containerproxy.log;
2+
3+
import java.io.BufferedInputStream;
4+
import java.io.BufferedOutputStream;
5+
import java.io.ByteArrayInputStream;
6+
import java.io.ByteArrayOutputStream;
7+
import java.io.IOException;
8+
import java.io.InputStream;
9+
import java.io.OutputStream;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.bouncycastle.util.Arrays;
14+
15+
import com.amazonaws.AmazonClientException;
16+
import com.amazonaws.auth.AWSStaticCredentialsProvider;
17+
import com.amazonaws.auth.BasicAWSCredentials;
18+
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
19+
import com.amazonaws.services.s3.AmazonS3;
20+
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
21+
import com.amazonaws.services.s3.model.ObjectMetadata;
22+
import com.amazonaws.services.s3.model.S3Object;
23+
import com.amazonaws.services.s3.transfer.TransferManager;
24+
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
25+
26+
import eu.openanalytics.containerproxy.model.runtime.Proxy;
27+
28+
//TODO Optimize flushing behaviour
29+
public class S3LogStorage extends AbstractLogStorage {
30+
31+
private AmazonS3 s3;
32+
private TransferManager transferMgr;
33+
34+
private String bucketName;
35+
private String bucketPath;
36+
private boolean enableSSE;
37+
38+
private Logger log = LogManager.getLogger(S3LogStorage.class);
39+
40+
@Override
41+
public void initialize() throws IOException {
42+
super.initialize();
43+
44+
String accessKey = environment.getProperty("proxy.container-log-s3-access-key");
45+
String accessSecret = environment.getProperty("proxy.container-log-s3-access-secret");
46+
String endpoint = environment.getProperty("proxy.container-log-s3-endpoint", "https://s3-eu-west-1.amazonaws.com");
47+
enableSSE = Boolean.valueOf(environment.getProperty("proxy.container-log-s3-sse", "false"));
48+
49+
String subPath = containerLogPath.substring("s3://".length());
50+
int bucketPathIndex = subPath.indexOf("/");
51+
bucketName = subPath.substring(0, bucketPathIndex);
52+
bucketPath = subPath.substring(bucketPathIndex + 1);
53+
54+
s3 = AmazonS3ClientBuilder.standard()
55+
.withEndpointConfiguration(new EndpointConfiguration(endpoint, null))
56+
.enablePathStyleAccess()
57+
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, accessSecret)))
58+
.build();
59+
transferMgr = TransferManagerBuilder.standard()
60+
.withS3Client(s3)
61+
.build();
62+
}
63+
64+
@Override
65+
public OutputStream[] createOutputStreams(Proxy proxy) throws IOException {
66+
String[] paths = getLogs(proxy);
67+
OutputStream[] streams = new OutputStream[2];
68+
for (int i = 0; i < streams.length; i++) {
69+
String fileName = paths[i].substring(paths[i].lastIndexOf("/") + 1);
70+
streams[i] = new BufferedOutputStream(new S3OutputStream(bucketPath + "/" + fileName), 1024*1024);
71+
}
72+
return streams;
73+
}
74+
75+
private void doUpload(String key, byte[] bytes) throws IOException {
76+
byte[] bytesToUpload = bytes;
77+
78+
byte[] originalBytes = getContent(key);
79+
if (originalBytes != null) {
80+
bytesToUpload = Arrays.copyOf(originalBytes, originalBytes.length + bytes.length);
81+
System.arraycopy(bytes, 0, bytesToUpload, originalBytes.length, bytes.length);
82+
}
83+
84+
ObjectMetadata metadata = new ObjectMetadata();
85+
metadata.setContentLength(bytesToUpload.length);
86+
if (enableSSE) metadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
87+
88+
if (log.isDebugEnabled()) log.debug(String.format("Writing log file to S3 [size: %d] [path: %s]", bytesToUpload.length, key));
89+
90+
InputStream bufferedInput = new BufferedInputStream(new ByteArrayInputStream(bytesToUpload), 20*1024*1024);
91+
try {
92+
transferMgr.upload(bucketName, key, bufferedInput, metadata).waitForCompletion();
93+
} catch (AmazonClientException | InterruptedException e) {
94+
throw new IOException(e);
95+
}
96+
}
97+
98+
private byte[] getContent(String key) throws IOException {
99+
if (s3.doesObjectExist(bucketName, key)) {
100+
S3Object o = s3.getObject(bucketName, key);
101+
ByteArrayOutputStream out = new ByteArrayOutputStream();
102+
try (InputStream in = o.getObjectContent()) {
103+
byte[] buffer = new byte[40*1024];
104+
int len = 0;
105+
while ((len = in.read(buffer)) > 0) {
106+
out.write(buffer, 0, len);
107+
}
108+
}
109+
return out.toByteArray();
110+
} else {
111+
return null;
112+
}
113+
}
114+
115+
private class S3OutputStream extends OutputStream {
116+
117+
private String s3Key;
118+
119+
public S3OutputStream(String s3Key) {
120+
this.s3Key = s3Key;
121+
}
122+
123+
@Override
124+
public void write(int b) throws IOException {
125+
// Warning: highly inefficient. Always write arrays.
126+
byte[] bytesToCopy = new byte[] { (byte) b };
127+
write(bytesToCopy, 0, 1);
128+
}
129+
130+
@Override
131+
public void write(byte[] b, int off, int len) throws IOException {
132+
byte[] bytesToCopy = new byte[len];
133+
System.arraycopy(b, off, bytesToCopy, 0, len);
134+
doUpload(s3Key, bytesToCopy);
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)