Skip to content

Commit 2997ff9

Browse files
committed
fixed a problem with GC with no subscriptions
1 parent 82c1f22 commit 2997ff9

6 files changed

Lines changed: 45 additions & 26 deletions

File tree

clusterhq/queue/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,6 @@
5959
<scope>test</scope>
6060
<version>3.2.0</version>
6161
</dependency>
62-
<dependency>
63-
<groupId>org.testng</groupId>
64-
<artifactId>testng</artifactId>
65-
<version>6.8.8</version>
66-
<scope>test</scope>
67-
</dependency>
6862
<dependency>
6963
<groupId>com.google.guava</groupId>
7064
<artifactId>guava</artifactId>

clusterhq/queue/queue.iml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,6 @@
4444
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.1.3" level="project" />
4545
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.7" level="project" />
4646
<orderEntry type="library" scope="TEST" name="Maven: org.assertj:assertj-core:3.2.0" level="project" />
47-
<orderEntry type="library" scope="TEST" name="Maven: org.testng:testng:6.8.8" level="project" />
48-
<orderEntry type="library" scope="TEST" name="Maven: org.beanshell:bsh:2.0b4" level="project" />
49-
<orderEntry type="library" scope="TEST" name="Maven: com.beust:jcommander:1.27" level="project" />
5047
<orderEntry type="library" name="Maven: com.google.guava:guava:18.0" level="project" />
5148
</component>
5249
</module>

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/DataStore.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Path getChunkPath(long baseLSN) {
6363
return dir.resolve(name);
6464
}
6565

66-
public long getNextLSN() {
66+
public synchronized long getNextLSN() {
6767
return nextLSN;
6868
}
6969

@@ -111,10 +111,12 @@ public synchronized long post(InputStream data) throws IOException {
111111
throw new IllegalArgumentException(nwritten + " != 8");
112112
}
113113
out.force(true);
114-
nextLSN += total + 8;
115-
if (getBaseLSN(nextLSN) != baseLSN) {
116-
// we exceeded this chunk, round this up to the next chunk
117-
nextLSN = getBaseLSN(nextLSN) + CHUNK_SIZE;
114+
synchronized (this) {
115+
nextLSN += total + 8;
116+
if (getBaseLSN(nextLSN) != baseLSN) {
117+
// we exceeded this chunk, round this up to the next chunk
118+
nextLSN = getBaseLSN(nextLSN) + CHUNK_SIZE;
119+
}
118120
}
119121
} catch (IOException e) {
120122
// truncate the file back to the original size
@@ -191,12 +193,12 @@ public Message get(long lsn) throws IOException {
191193
String.format("%s/%s is %s bytes but chunk is %s bytes", relativeLSN, lsn, messageSize, chunkSize));
192194
}
193195

194-
long nextLSN = lsn + 8 + messageSize;
195-
if (getBaseLSN(nextLSN) != baseLSN) {
196+
long lsnAfter = lsn + 8 + messageSize;
197+
if (getBaseLSN(lsnAfter) != baseLSN) {
196198
// we exceeded this chunk, round this up to the next chunk
197-
nextLSN = getBaseLSN(nextLSN) + CHUNK_SIZE;
199+
lsnAfter = getBaseLSN(lsnAfter) + CHUNK_SIZE;
198200
}
199-
return new Message(ByteStreams.limit(Channels.newInputStream(in), messageSize), nextLSN);
201+
return new Message(ByteStreams.limit(Channels.newInputStream(in), messageSize), lsnAfter);
200202
} catch (BufferUnderflowException e) {
201203
throw new IOException(e);
202204
} catch (IOException e) {
Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,36 @@
11
package us.hxbc.clusterhq.queue;
22

3+
import org.glassfish.grizzly.http.server.HttpServer;
34
import org.glassfish.jersey.filter.LoggingFilter;
45
import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
56
import org.glassfish.jersey.server.ResourceConfig;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
89

10+
import java.io.IOException;
911
import java.net.URI;
1012
import java.nio.file.Files;
1113
import java.nio.file.Path;
1214
import java.nio.file.Paths;
1315

1416
public class Main {
1517
private static Logger logger = LoggerFactory.getLogger(Main.class);
18+
final HttpServer server;
1619

17-
public static void main(String[] args) {
20+
Main(int port, Path dir) {
21+
ResourceConfig rc = new ResourceConfig();
22+
rc.registerInstances(new Api(dir, 4096));
23+
if (logger.isDebugEnabled()) {
24+
rc.register(new LoggingFilter(java.util.logging.Logger.getGlobal(), false));
25+
}
26+
server = GrizzlyHttpServerFactory.createHttpServer(URI.create("http://0.0.0.0:" + port), rc, false);
27+
}
28+
29+
void start() throws IOException {
30+
server.start();
31+
}
32+
33+
public static void main(String[] args) throws IOException {
1834
if (args.length != 2) {
1935
System.err.println("Usage: Main <port> <dir>");
2036
System.exit(1);
@@ -27,11 +43,6 @@ public static void main(String[] args) {
2743
System.exit(1);
2844
}
2945

30-
ResourceConfig rc = new ResourceConfig();
31-
rc.registerInstances(new Api(dir, 4096));
32-
if (logger.isDebugEnabled()) {
33-
rc.register(new LoggingFilter(java.util.logging.Logger.getGlobal(), false));
34-
}
35-
GrizzlyHttpServerFactory.createHttpServer(URI.create("http://0.0.0.0:" + port), rc);
46+
new Main(port, dir).start();
3647
}
3748
}

clusterhq/queue/src/main/java/us/hxbc/clusterhq/queue/Queue.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package us.hxbc.clusterhq.queue;
22

3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
36
import javax.ws.rs.ClientErrorException;
47
import javax.ws.rs.core.Response;
58
import java.io.IOException;
@@ -24,6 +27,7 @@
2427
*/
2528
public class Queue {
2629
private static final long CHUNK_SIZE = 4 * 1024; // 4KB
30+
private final Logger logger = LoggerFactory.getLogger(getClass());
2731
private final Path dataDir, subscriptionDir;
2832
private final Map<String, Subscriber> subscriptions = new HashMap<>();
2933
private final DataStore dataStore;
@@ -124,7 +128,7 @@ void spawnGCThread() {
124128
}
125129

126130
synchronized void gcNow() {
127-
long curMinLSN = Long.MAX_VALUE;
131+
long curMinLSN = dataStore.getNextLSN();
128132

129133
synchronized (subscriptions) {
130134
for (Subscriber s : subscriptions.values()) {
@@ -136,6 +140,7 @@ synchronized void gcNow() {
136140
}
137141
}
138142

143+
logger.info("GC up to {}", curMinLSN);
139144
if (curMinLSN > minLSN) {
140145
dataStore.gc(curMinLSN);
141146
minLSN = curMinLSN;

clusterhq/queue/src/test/java/us/hxbc/clusterhq/queue/QueueTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,4 +173,14 @@ public void testGcNone2Users() throws Exception {
173173
queue.gcNow();
174174
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(1);
175175
}
176+
177+
@Test
178+
public void testGcAgain() throws Exception {
179+
queue.gcNow();
180+
testGcNone2Users();
181+
assertThat(stream2String(queue.get("bar").in)).isEqualTo("world");
182+
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(1);
183+
queue.gcNow();
184+
assertThat(Files.list(dir.resolve("data")).count()).isEqualTo(0);
185+
}
176186
}

0 commit comments

Comments
 (0)