Skip to content

Commit 3c58a8a

Browse files
authored
Merge pull request #80 from PgBulkInsert/errorprone
Errorprone
2 parents d08a35e + 84a8f36 commit 3c58a8a

38 files changed

Lines changed: 557 additions & 595 deletions

File tree

PgBulkInsert/pgbulkinsert-bulkprocessor/src/main/java/de/bytefish/pgbulkinsert/bulkprocessor/BulkProcessor.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,22 @@
33
package de.bytefish.pgbulkinsert.bulkprocessor;
44

55
import de.bytefish.pgbulkinsert.bulkprocessor.handler.IBulkWriteHandler;
6+
import org.checkerframework.checker.nullness.qual.Nullable;
67

78
import java.time.Duration;
89
import java.util.ArrayList;
910
import java.util.List;
10-
import java.util.concurrent.*;
11+
import java.util.Optional;
12+
import java.util.concurrent.Executors;
13+
import java.util.concurrent.ScheduledFuture;
14+
import java.util.concurrent.ScheduledThreadPoolExecutor;
15+
import java.util.concurrent.TimeUnit;
1116

1217
public class BulkProcessor<TEntity> implements AutoCloseable {
1318

19+
@Nullable
1420
private final ScheduledThreadPoolExecutor scheduler;
21+
@Nullable
1522
private final ScheduledFuture<?> scheduledFuture;
1623

1724
private volatile boolean closed = false;
@@ -26,7 +33,7 @@ public BulkProcessor(IBulkWriteHandler<TEntity> handler, int bulkSize) {
2633
this(handler, bulkSize, null);
2734
}
2835

29-
public BulkProcessor(IBulkWriteHandler<TEntity> handler, int bulkSize, Duration flushInterval) {
36+
public BulkProcessor(IBulkWriteHandler<TEntity> handler, int bulkSize, @Nullable Duration flushInterval) {
3037

3138
this.handler = handler;
3239
this.bulkSize = bulkSize;
@@ -61,10 +68,8 @@ public void close() throws Exception {
6168
closed = true;
6269

6370
// Quit the Scheduled FlushInterval Future:
64-
if (this.scheduledFuture != null) {
65-
cancel(this.scheduledFuture);
66-
this.scheduler.shutdown();
67-
}
71+
Optional.ofNullable(this.scheduledFuture).ifPresent(future -> future.cancel(false));
72+
Optional.ofNullable(this.scheduler).ifPresent(ScheduledThreadPoolExecutor::shutdown);
6873

6974
// Are there any entities left to write?
7075
if (batchedEntities.size() > 0) {
@@ -96,13 +101,6 @@ private void write(List<TEntity> entities) {
96101
}
97102
}
98103

99-
public static boolean cancel(Future<?> future) {
100-
if (future != null) {
101-
return future.cancel(false);
102-
}
103-
return false;
104-
}
105-
106104
class Flush implements Runnable {
107105

108106
@Override

PgBulkInsert/pgbulkinsert-bulkprocessor/src/main/java/de/bytefish/pgbulkinsert/bulkprocessor/handler/BulkWriteHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@
22

33
package de.bytefish.pgbulkinsert.bulkprocessor.handler;
44

5+
import de.bytefish.pgbulkinsert.IPgBulkInsert;
6+
import de.bytefish.pgbulkinsert.util.PostgreSqlUtils;
7+
import org.postgresql.PGConnection;
8+
59
import java.sql.Connection;
610
import java.util.List;
711
import java.util.function.Supplier;
812

9-
import org.postgresql.PGConnection;
10-
11-
import de.bytefish.pgbulkinsert.IPgBulkInsert;
12-
import de.bytefish.pgbulkinsert.util.PostgreSqlUtils;
13-
1413
public class BulkWriteHandler<TEntity> implements IBulkWriteHandler<TEntity> {
1514

1615
private final IPgBulkInsert<TEntity> client;
@@ -22,6 +21,7 @@ public BulkWriteHandler(IPgBulkInsert<TEntity> client, Supplier<Connection> conn
2221
this.connectionFactory = connectionFactory;
2322
}
2423

24+
@Override
2525
public void write(List<TEntity> entities) throws Exception {
2626
// Obtain a new Connection and execute it in a try with resources block, so it gets closed properly:
2727
try(Connection connection = connectionFactory.get()) {

PgBulkInsert/pgbulkinsert-bulkprocessor/src/test/java/de/bytefish/pgbulkinsert/test/bulkprocessor/BulkProcessorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ protected void onSetUpInTransaction() throws Exception {
3131
}
3232

3333
// Define a Custom Handler for the Unit Test, which does not close the Connection:
34-
class CustomBulkWriteHandler<TEntity> implements IBulkWriteHandler<TEntity> {
34+
static class CustomBulkWriteHandler<TEntity> implements IBulkWriteHandler<TEntity> {
3535

3636
private final IPgBulkInsert<TEntity> client;
3737
private final Supplier<Connection> connectionFactory;
@@ -41,6 +41,7 @@ public CustomBulkWriteHandler(IPgBulkInsert<TEntity> client, Supplier<Connection
4141
this.connectionFactory = connectionFactory;
4242
}
4343

44+
@Override
4445
public void write(List<TEntity> entities) throws Exception {
4546
Connection connection = connectionFactory.get();
4647

PgBulkInsert/pgbulkinsert-core/src/main/java/de/bytefish/pgbulkinsert/PgBulkInsert.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import java.sql.SQLException;
1414
import java.util.Collection;
1515
import java.util.Objects;
16-
import java.util.function.BiConsumer;
1716
import java.util.stream.Stream;
1817

1918
public class PgBulkInsert<TEntity> implements IPgBulkInsert<TEntity> {
@@ -34,28 +33,17 @@ public PgBulkInsert(IConfiguration configuration, AbstractMapping<TEntity> mappi
3433
this.mapping = mapping;
3534
}
3635

36+
@Override
3737
public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {
38-
39-
try (PgBinaryWriter bw = new PgBinaryWriter(configuration.getBufferSize())) {
40-
41-
// Wrap the CopyOutputStream in our own Writer:
42-
bw.open(new PGCopyOutputStream(connection, mapping.getCopyCommand(), 1));
43-
38+
// Wrap the CopyOutputStream in our own Writer:
39+
try (PgBinaryWriter bw = new PgBinaryWriter(new PGCopyOutputStream(connection, mapping.getCopyCommand(), 1), configuration.getBufferSize())) {
4440
// Insert Each Column:
4541
entities.forEach(entity -> saveEntitySynchonized(bw, entity));
4642
}
4743
}
4844

4945
public void saveAll(PGConnection connection, Collection<TEntity> entities) throws SQLException {
50-
51-
try (PgBinaryWriter bw = new PgBinaryWriter(configuration.getBufferSize())) {
52-
53-
// Wrap the CopyOutputStream in our own Writer:
54-
bw.open(new PGCopyOutputStream(connection, mapping.getCopyCommand(), 1));
55-
56-
// Insert Each Column:
57-
entities.forEach(entity -> saveEntity(bw, entity));
58-
}
46+
saveAll(connection, entities.stream());
5947
}
6048

6149
private void saveEntity(PgBinaryWriter bw, TEntity entity) throws SaveEntityFailedException {
@@ -71,7 +59,7 @@ private void saveEntity(PgBinaryWriter bw, TEntity entity) throws SaveEntityFail
7159
throw new SaveEntityFailedException(e);
7260
}
7361
}
74-
62+
7563
private void saveEntitySynchonized(PgBinaryWriter bw, TEntity entity) throws SaveEntityFailedException {
7664
synchronized (bw) {
7765
saveEntity(bw, entity);

PgBulkInsert/pgbulkinsert-core/src/main/java/de/bytefish/pgbulkinsert/pgsql/PgBinaryWriter.java

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
66
import de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;
7+
import org.checkerframework.checker.nullness.qual.Nullable;
78

89
import java.io.BufferedOutputStream;
910
import java.io.DataOutputStream;
@@ -12,22 +13,15 @@
1213

1314
public class PgBinaryWriter implements AutoCloseable {
1415

15-
private transient DataOutputStream buffer;
16+
private final transient DataOutputStream buffer;
1617

17-
private final int bufferSize;
18-
19-
public PgBinaryWriter() {
20-
this(65536);
21-
}
22-
23-
public PgBinaryWriter(int bufferSize) {
24-
this.bufferSize = bufferSize;
18+
public PgBinaryWriter(final OutputStream out) {
19+
this(out, 65536);
2520
}
2621

27-
public void open(final OutputStream out) {
28-
buffer = new DataOutputStream(new BufferedOutputStream(out, bufferSize));
29-
30-
writeHeader();
22+
public PgBinaryWriter(final OutputStream out, final int bufferSize) {
23+
buffer = new DataOutputStream(new BufferedOutputStream(out, bufferSize));
24+
writeHeader();
3125
}
3226

3327
public void startRow(int numColumns) {
@@ -43,15 +37,15 @@ public void startRow(int numColumns) {
4337
}
4438
}
4539

46-
public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
40+
public <TTargetType> void write(final IValueHandler<TTargetType> handler, @Nullable final TTargetType value) {
4741
handler.handle(buffer, value);
4842
}
49-
43+
5044
/**
5145
* Writes primitive boolean to the output stream
52-
*
46+
*
5347
* @param value value to write
54-
*
48+
*
5549
*/
5650
public void writeBoolean(boolean value) {
5751
try {
@@ -71,12 +65,12 @@ public void writeBoolean(boolean value) {
7165
}
7266
}
7367

74-
68+
7569
/**
7670
* Writes primitive byte to the output stream
77-
*
71+
*
7872
* @param value value to write
79-
*
73+
*
8074
*/
8175
public void writeByte(int value) {
8276
try {
@@ -94,9 +88,9 @@ public void writeByte(int value) {
9488

9589
/**
9690
* Writes primitive short to the output stream
97-
*
91+
*
9892
* @param value value to write
99-
*
93+
*
10094
*/
10195
public void writeShort(int value) {
10296
try {
@@ -114,9 +108,9 @@ public void writeShort(int value) {
114108

115109
/**
116110
* Writes primitive integer to the output stream
117-
*
111+
*
118112
* @param value value to write
119-
*
113+
*
120114
*/
121115
public void writeInt(int value) {
122116
try {
@@ -134,9 +128,9 @@ public void writeInt(int value) {
134128

135129
/**
136130
* Writes primitive long to the output stream
137-
*
131+
*
138132
* @param value value to write
139-
*
133+
*
140134
*/
141135
public void writeLong(long value) {
142136
try {
@@ -151,12 +145,12 @@ public void writeLong(long value) {
151145
}
152146
}
153147
}
154-
148+
155149
/**
156150
* Writes primitive float to the output stream
157-
*
151+
*
158152
* @param value value to write
159-
*
153+
*
160154
*/
161155
public void writeFloat(float value) {
162156
try {
@@ -174,9 +168,9 @@ public void writeFloat(float value) {
174168

175169
/**
176170
* Writes primitive double to the output stream
177-
*
171+
*
178172
* @param value value to write
179-
*
173+
*
180174
*/
181175
public void writeDouble(double value) {
182176
try {
@@ -224,7 +218,7 @@ public void close() {
224218
}
225219
}
226220
}
227-
221+
228222
private void writeHeader() {
229223
try {
230224

PgBulkInsert/pgbulkinsert-core/src/main/java/de/bytefish/pgbulkinsert/pgsql/handlers/BaseValueHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
package de.bytefish.pgbulkinsert.pgsql.handlers;
44

55
import de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
6+
import org.checkerframework.checker.nullness.qual.Nullable;
67

78
import java.io.DataOutputStream;
89

910
public abstract class BaseValueHandler<T> implements IValueHandler<T> {
1011

1112
@Override
12-
public void handle(DataOutputStream buffer, final T value) {
13+
public void handle(DataOutputStream buffer, @Nullable final T value) {
1314
try {
1415
if (value == null) {
1516
buffer.writeInt(-1);

0 commit comments

Comments
 (0)