Reactive DB2 Client
The Reactive DB2 Client is a client for DB2 with a straightforward API focusing on scalability and low overhead.
The client is reactive and non blocking, allowing to handle many database connections with a single thread.
Features
-
Support for DB2 on Linux, Unix, and Windows
-
Limited support for DB2 on z/OS
-
Event driven
-
Lightweight
-
Built-in connection pooling
-
Prepared queries caching
-
Batch and cursor
-
Row streaming
-
RxJava 1 and RxJava 2
-
Direct memory to object without unnecessary copies
-
Java 8 Date and Time
-
SSL/TLS
-
HTTP/1.x CONNECT, SOCKS4a or SOCKS5 proxy support
Current limitations
-
No stored procedures support
-
Some column types (e.g. BLOB and CLOB) are not supported
Usage
To use the Reactive DB2 Client add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-db2-client</artifactId>
<version>3.9.15</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-db2-client:3.9.15'
}
Getting started
Here is the simplest way to connect, query and disconnect
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
DB2Pool client = DB2Pool.pool(connectOptions, poolOptions);
// A simple query
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// Now close the pool
client.close();
});
Connecting to DB2
Most of the time you will use a pool to connect to DB2:
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
DB2Pool client = DB2Pool.pool(connectOptions, poolOptions);
The pooled client uses a connection pool and any operation will borrow a connection from the pool to execute the operation and release it to the pool.
If you are running with Vert.x you can pass it your Vertx instance:
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
DB2Pool client = DB2Pool.pool(vertx, connectOptions, poolOptions);
You need to release the pool when you don’t need it anymore:
pool.close();
When you need to execute several operations on the same connection, you need to use a client
connection
.
You can easily get one from the pool:
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
DB2Pool client = DB2Pool.pool(vertx, connectOptions, poolOptions);
// Get a connection from the pool
client.getConnection(ar1 -> {
if (ar1.succeeded()) {
System.out.println("Connected");
// Obtain our connection
SqlConnection conn = ar1.result();
// All operations execute on the same connection
conn
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar2 -> {
if (ar2.succeeded()) {
conn
.query("SELECT * FROM users WHERE id='emad'")
.execute(ar3 -> {
// Release the connection to the pool
conn.close();
});
} else {
// Release the connection to the pool
conn.close();
}
});
} else {
System.out.println("Could not connect: " + ar1.cause().getMessage());
}
});
Once you are done with the connection you must close it to release it to the pool, so it can be reused.
Configuration
There are several alternatives for you to configure the client.
data object
A simple way to configure the client is to specify a DB2ConnectOptions
data object.
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the data object
DB2Pool pool = DB2Pool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
// Handling your connection
});
You can also configure the generic properties with the setProperties
or addProperty
methods. Note setProperties
will override the default client properties.
connection uri
Apart from configuring with a DB2ConnectOptions
data object, We also provide you an alternative way to connect when you want to configure with a connection URI:
String connectionUri = "db2://dbuser:secretpassword@database.server.com:50000/mydb";
// Create the pool from the connection URI
DB2Pool pool = DB2Pool.pool(connectionUri);
// Create the connection from the connection URI
DB2Connection.connect(vertx, connectionUri, res -> {
// Handling your connection
});
The URI format for a connection string is:
db2://<USERNAME>:<PASSWORD>@<HOSTNAME>:<PORT>/<DBNAME>
Currently the client supports the following parameter key words in connection uri
-
host
-
port
-
user
-
password
-
dbname
Note: configuring properties in connection URI will override the default properties.
Running queries
When you don’t need a transaction or run single queries, you can run queries directly on the pool; the pool will use one of its connection to run the query and return the result to you.
Here is how to run simple queries:
client
.query("SELECT * FROM users WHERE id='andy'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Prepared queries
You can do the same with prepared queries.
The SQL string can refer to parameters by position, using the database syntax `?`
client
.preparedQuery("SELECT * FROM users WHERE id=?")
.execute(Tuple.of("andy"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Query methods provides an asynchronous RowSet
instance that works for SELECT queries
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
or UPDATE/INSERT queries:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
.execute(Tuple.of("Andy", "Guibert"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
The Row
gives you access to your data by index
System.out.println("User " + row.getString(0) + " " + row.getString(1));
or by name
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
The client will not do any magic here and the column name is identified with the name in the table regardless of how your SQL text is.
You can access a wide variety of of types
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
You can use cached prepared statements to execute one-shot prepared queries:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = ?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
You can create a PreparedStatement
and manage the lifecycle by yourself.
sqlConnection
.prepare("SELECT * FROM users WHERE id = ?", ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Batches
You can execute prepared batch
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julient Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
batch.add(Tuple.of("andy", "Andy Guibert"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
.executeBatch(batch, res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
You can fetch generated keys by wrapping your query in SELECT <COLUMNS> FROM FINAL TABLE ( <SQL> )
, for example:
client
.preparedQuery("SELECT color_id FROM FINAL TABLE ( INSERT INTO color (color_name) VALUES (?), (?), (?) )")
.execute(Tuple.of("white", "red", "blue"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Inserted " + rows.rowCount() + " new rows.");
for (Row row : rows) {
System.out.println("generated key: " + row.getInteger("color_id"));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Using connections
When you need to execute sequential queries (without a transaction), you can create a new connection or borrow one from the pool:
pool.getConnection(ar1 -> {
if (ar1.succeeded()) {
SqlConnection connection = ar1.result();
connection
.query("SELECT * FROM users WHERE id='andy'")
.execute(ar2 -> {
if (ar1.succeeded()) {
connection
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar3 -> {
// Do something with rows and return the connection to the pool
connection.close();
});
} else {
// Return the connection to the pool
connection.close();
}
});
}
});
Prepared queries can be created:
connection.prepare("SELECT * FROM users WHERE first_name LIKE ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
pq.query().execute(Tuple.of("andy"), ar2 -> {
if (ar2.succeeded()) {
// All rows
RowSet<Row> rows = ar2.result();
}
});
}
});
Using transactions
Transactions with connections
You can execute transaction using SQL BEGIN
/COMMIT
/ROLLBACK
, if you do so you must use
a SqlConnection
and manage it yourself.
Or you can use the transaction API of SqlConnection
:
pool.getConnection(res -> {
if (res.succeeded()) {
// Transaction must use a connection
SqlConnection conn = res.result();
// Begin the transaction
Transaction tx = conn.begin();
// Various statements
conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute(ar1 -> {
if (ar1.succeeded()) {
conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute(ar2 -> {
if (ar2.succeeded()) {
// Commit the transaction
tx.commit(ar3 -> {
if (ar3.succeeded()) {
System.out.println("Transaction succeeded");
} else {
System.out.println("Transaction failed " + ar3.cause().getMessage());
}
// Return the connection to the pool
conn.close();
});
} else {
// Return the connection to the pool
conn.close();
}
});
} else {
// Return the connection to the pool
conn.close();
}
});
}
});
When the database server reports the current transaction is failed (e.g the infamous current transaction is aborted, commands ignored until
end of transaction block), the transaction is rollbacked and the abortHandler
is called:
tx.abortHandler(v -> {
System.out.println("Transaction failed => rollbacked");
});
Simplified transaction API
When you use a pool, you can start a transaction directly on the pool.
It borrows a connection from the pool, begins the transaction and releases the connection to the pool when the transaction ends.
pool.begin(res -> {
if (res.succeeded()) {
// Get the transaction
Transaction tx = res.result();
// Various statements
tx.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute(ar1 -> {
if (ar1.succeeded()) {
tx.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute(ar2 -> {
if (ar2.succeeded()) {
// Commit the transaction
// the connection will automatically return to the pool
tx.commit(ar3 -> {
if (ar3.succeeded()) {
System.out.println("Transaction succeeded");
} else {
System.out.println("Transaction failed " + ar3.cause().getMessage());
}
});
}
});
} else {
// No need to close connection as transaction will abort and be returned to the pool
}
});
}
});
注意
|
this code will not close the connection because it will always be released back to the pool when the transaction |
Cursors and streaming
By default prepared query execution fetches all rows, you can use a
Cursor
to control the amount of rows you want to read:
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of(18));
// Read 50 rows
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - close the cursor
cursor.close();
}
}
});
}
});
Cursors shall be closed when they are released prematurely:
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
A stream API is also available for cursors, which can be more convenient, specially with the Rxified version.
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
The stream read the rows by batch of 50
and stream them, when the rows have been passed to the handler,
a new batch of 50
is read and so on.
The stream can be resumed or paused, the loaded rows will remain in memory until they are delivered and the cursor will stop iterating.
DB2 type mapping
Currently the client supports the following DB2 types
-
BOOLEAN (
java.lang.Boolean
) (DB2 LUW only) -
SMALLINT (
java.lang.Short
) -
INTEGER (
java.lang.Integer
) -
BIGINT (
java.lang.Long
) -
REAL (
java.lang.Float
) -
DOUBLE (
java.lang.Double
) -
DECIMAL (
io.vertx.sqlclient.data.Numeric
) -
CHAR (
java.lang.String
) -
VARCHAR (
java.lang.String
) -
ENUM (
java.lang.String
) -
DATE (
java.time.LocalDate
) -
TIME (
java.time.LocalTime
) -
TIMESTAMP (
java.time.LocalDateTime
) -
BINARY (
byte[]
) -
VARBINARY (
byte[]
) -
ROWID (
io.vertx.db2client.impl.drda.DB2RowId
orjava.sql.RowId
) (DB2 z/OS only)
Some types that are currently NOT supported are:
-
XML
-
BLOB
-
CLOB
-
DBCLOB
-
GRAPHIC / VARGRAPHIC
For a further documentation on DB2 data types, see the following resources:
Tuple decoding uses the above types when storing values, it also performs on the fly conversion of the actual value when possible:
pool
.query("SELECT an_int_column FROM exampleTable")
.execute(ar -> {
RowSet<Row> rowSet = ar.result();
Row row = rowSet.iterator().next();
// Stored as INTEGER column type and represented as java.lang.Integer
Object value = row.getValue(0);
// Convert to java.lang.Long
Long longValue = row.getLong(0);
});
Using Java enum types
You can map Java enum types to these column types:
-
Strings (VARCHAR, TEXT)
-
Numbers (SMALLINT, INTEGER, BIGINT)
client.preparedQuery("SELECT day_name FROM FINAL TABLE ( INSERT INTO days (day_name) VALUES (?), (?), (?) )")
.execute(Tuple.of(Days.FRIDAY, Days.SATURDAY, Days.SUNDAY), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Inserted " + rows.rowCount() + " new rows");
for (Row row : rows) {
System.out.println("Day: " + row.getValues(Days.class, row.getColumnIndex("day_name")));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
client.preparedQuery("SELECT day_num FROM FINAL TABLE ( INSERT INTO days (day_num) VALUES (?), (?), (?) )")
.execute(Tuple.of(Days.FRIDAY.ordinal(), Days.SATURDAY.ordinal(), Days.SUNDAY.ordinal()), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Inserted " + rows.rowCount() + " new rows");
for (Row row : rows) {
System.out.println("Day: " + row.getValues(Days.class, row.getColumnIndex("day_num")));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
The String type is matched with the Java enum’s name returned by the name()
method.
Number types are matched with the Java enum’s ordinal returned by the ordinal()
method and the row.get() method returns the corresponding enum’s name()
value at the ordinal position of the integer value retrieved.
Collector queries
You can use Java collectors with the query API:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// Run the query with the collector
client.query("SELECT * FROM users")
.collecting(collector)
.execute(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// Get the map created by the collector
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
The collector processing must not keep a reference on the Row
as
there is a single row used for processing the entire set.
The Java Collectors
provides many interesting predefined collectors, for example you can
create easily create a string directly from the row set:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Using SSL/TLS
To configure the client to use SSL connection, you can configure the DB2ConnectOptions
like a Vert.x NetClient
.
DB2ConnectOptions options = new DB2ConnectOptions()
.setPort(50001)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setSsl(true)
.setTrustStoreOptions(new JksOptions()
.setPath("/path/to/keystore.p12")
.setPassword("keystoreSecret"));
DB2Connection.connect(vertx, options, res -> {
if (res.succeeded()) {
// Connected with SSL
} else {
System.out.println("Could not connect " + res.cause());
}
});
More information can be found in the Vert.x documentation.
Using a proxy
You can also configure the client to use an HTTP/1.x CONNECT, SOCKS4a or SOCKS5 proxy.
More information can be found in the Vert.x documentation.
Unresolved directive in index.adoc - include::override/rxjava2.adoc[]