Reactive MySQL Client
The Reactive MySQL Client is a client for MySQL with a straightforward API focusing on scalability and low overhead.
Features
-
Event driven
-
Lightweight
-
Built-in connection pooling
-
Prepared queries caching
-
Cursor support
-
Row streaming
-
RxJava 1 and RxJava 2
-
Direct memory to object without unnecessary copies
-
Java 8 Date and Time
-
Stored Procedures support
-
TLS/SSL support
-
MySQL utilities commands support
-
Working with MySQL and MariaDB
-
Rich collation and charset support
-
Unix domain socket
Usage
To use the Reactive MySQL Client add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mysql-client</artifactId>
<version>3.9.15</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-mysql-client:3.9.15'
}
Getting started
Here is the simplest way to connect, query and disconnect
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
// A simple query
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// Now close the pool
client.close();
});
Connecting to MySQL
Most of the time you will use a pool to connect to MySQL:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
The pooled client uses a connection pool and any operation will borrow a connection from the pool to execute the operation and release it to the pool.
If you are running with Vert.x you can pass it your Vertx instance:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
You need to release the pool when you don’t need it anymore:
pool.close();
When you need to execute several operations on the same connection, you need to use a client
connection
.
You can easily get one from the pool:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
// Get a connection from the pool
client.getConnection(ar1 -> {
if (ar1.succeeded()) {
System.out.println("Connected");
// Obtain our connection
SqlConnection conn = ar1.result();
// All operations execute on the same connection
conn
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar2 -> {
if (ar2.succeeded()) {
conn
.query("SELECT * FROM users WHERE id='emad'")
.execute(ar3 -> {
// Release the connection to the pool
conn.close();
});
} else {
// Release the connection to the pool
conn.close();
}
});
} else {
System.out.println("Could not connect: " + ar1.cause().getMessage());
}
});
Once you are done with the connection you must close it to release it to the pool, so it can be reused.
Unix Domain Socket
Sometimes for simplicity, security or performance reasons, it is required to connect via a Unix Domain Socket.
Since the JVM does not support domain sockets, first you must add native transport extensions to your project.
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.netty:netty-transport-native-epoll:${netty.version}:linux-x86_64'
}
注意
|
The native epoll support for ARM64 can also be added with the classifier linux-aarch64 .
|
注意
|
If there are Mac users in your team, add netty-transport-native-kqueue with the classifier osx-x86_64 .
|
Then set the path to the domain socket in MySQLConnectOptions#setHost
:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setHost("/var/run/mysqld/mysqld.sock")
.setDatabase("the-db");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
// Create the pooled client with a vertx instance
// Make sure the vertx instance has enabled native transports
// vertxOptions.setPreferNativeTransport(true);
MySQLPool client2 = MySQLPool.pool(vertx, connectOptions, poolOptions);
More information about native transports can be found in the [Vert.x documentation](https://vertx.io/docs/vertx-core/java/#_native_transports).
Configuration
There are several alternatives for you to configure the client.
Data Object
A simple way to configure the client is to specify a MySQLConnectOptions
data object.
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the data object
MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
// Handling your connection
});
collations and character sets
The Reactive MySQL client supports configuring collations or character sets and map them to a correlative java.nio.charset.Charset
.
For example, you can specify charset for a connection like
MySQLConnectOptions connectOptions = new MySQLConnectOptions();
// set connection character set to utf8 instead of the default charset utf8mb4
connectOptions.setCharset("utf8");
The Reactive MySQL Client will take utf8mb4
as the default charset. String values like password and error messages are always decoded in UTF-8
charset.
characterEncoding
option is used to determine which Java charset will be used to encode String values such as query string and parameter values, the charset is UTF-8
by default and if it’s set to null
then the client will use the default Java charset instead.
You can also specify collation for a connection like
MySQLConnectOptions connectOptions = new MySQLConnectOptions();
// set connection collation to utf8_general_ci instead of the default collation utf8mb4_general_ci
// setting a collation will override the charset option
connectOptions.setCharset("gbk");
connectOptions.setCollation("utf8_general_ci");
Note setting a collation on the data object will override the charset and characterEncoding option.
You can execute SQL SHOW COLLATION;
or SHOW CHARACTER SET;
to get the supported collations and charsets by the server.
More information about MySQL charsets and collations can be found in the MySQL Reference Manual.
connection attributes
You can also configure the connection attributes with the setProperties
or addProperty
methods. Note setProperties
will override the default client properties.
MySQLConnectOptions connectOptions = new MySQLConnectOptions();
// Add a connection attribute
connectOptions.addProperty("_java_version", "1.8.0_212");
// Override the attributes
Map<String, String> attributes = new HashMap<>();
attributes.put("_client_name", "myapp");
attributes.put("_client_version", "1.0.0");
connectOptions.setProperties(attributes);
More information about client connection attributes can be found in the MySQL Reference Manual.
useAffectedRows
You can configure the useAffectedRows
option to decide whether to set CLIENT_FOUND_ROWS
flag when connecting to the server. If the CLIENT_FOUND_ROWS
flag is specified then the affected rows count is the numeric value of rows found rather than affected.
More information about this can be found in the MySQL Reference Manual
connection URI
Apart from configuring with a MySQLConnectOptions
data object, We also provide you an alternative way to connect when you want to configure with a connection URI:
String connectionUri = "mysql://dbuser:secretpassword@database.server.com:3211/mydb";
// Create the pool from the connection URI
MySQLPool pool = MySQLPool.pool(connectionUri);
// Create the connection from the connection URI
MySQLConnection.connect(vertx, connectionUri, res -> {
// Handling your connection
});
More information about connection string formats can be found in the MySQL Reference Manual.
Currently, the client supports the following parameter keywords in connection uri (keys are case-insensitive):
-
host
-
port
-
user
-
password
-
schema
-
socket
-
useAffectedRows
Running queries
When you don’t need a transaction or run single queries, you can run queries directly on the pool; the pool will use one of its connection to run the query and return the result to you.
Here is how to run simple queries:
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Prepared queries
You can do the same with prepared queries.
The SQL string can refer to parameters by position, using the database syntax `?`
client
.preparedQuery("SELECT * FROM users WHERE id=?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Query methods provides an asynchronous RowSet
instance that works for SELECT queries
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
or UPDATE/INSERT queries:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
.execute(Tuple.of("Julien", "Viet"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
The Row
gives you access to your data by index
System.out.println("User " + row.getString(0) + " " + row.getString(1));
or by name
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
The client will not do any magic here and the column name is identified with the name in the table regardless of how your SQL text is.
You can access a wide variety of of types
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
You can use cached prepared statements to execute one-shot prepared queries:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = ?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
You can create a PreparedStatement
and manage the lifecycle by yourself.
sqlConnection
.prepare("SELECT * FROM users WHERE id = ?", ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Batches
You can execute prepared batch
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
.executeBatch(batch, res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
MySQL LAST_INSERT_ID
You can get the auto incremented value if you insert a record into the table.
client
.query("INSERT INTO test(val) VALUES ('v1')")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
long lastInsertId = rows.property(MySQLClient.LAST_INSERTED_ID);
System.out.println("Last inserted id is: " + lastInsertId);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
More information can be found in How to Get the Unique ID for the Last Inserted Row.
Using connections
When you need to execute sequential queries (without a transaction), you can create a new connection or borrow one from the pool:
pool.getConnection(ar1 -> {
if (ar1.succeeded()) {
SqlConnection connection = ar1.result();
connection
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar2 -> {
if (ar1.succeeded()) {
connection
.query("SELECT * FROM users WHERE id='paulo'")
.execute(ar3 -> {
// Do something with rows and return the connection to the pool
connection.close();
});
} else {
// Return the connection to the pool
connection.close();
}
});
}
});
Prepared queries can be created:
connection.prepare("SELECT * FROM users WHERE first_name LIKE ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement prepared = ar1.result();
prepared.query().execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
// All rows
RowSet<Row> rows = ar2.result();
}
});
}
});
Using transactions
Transactions with connections
You can execute transaction using SQL BEGIN
/COMMIT
/ROLLBACK
, if you do so you must use
a SqlConnection
and manage it yourself.
Or you can use the transaction API of SqlConnection
:
pool.getConnection(res -> {
if (res.succeeded()) {
// Transaction must use a connection
SqlConnection conn = res.result();
// Begin the transaction
Transaction tx = conn.begin();
// Various statements
conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute(ar1 -> {
if (ar1.succeeded()) {
conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute(ar2 -> {
if (ar2.succeeded()) {
// Commit the transaction
tx.commit(ar3 -> {
if (ar3.succeeded()) {
System.out.println("Transaction succeeded");
} else {
System.out.println("Transaction failed " + ar3.cause().getMessage());
}
// Return the connection to the pool
conn.close();
});
} else {
// Return the connection to the pool
conn.close();
}
});
} else {
// Return the connection to the pool
conn.close();
}
});
}
});
When the database server reports the current transaction is failed (e.g the infamous current transaction is aborted, commands ignored until
end of transaction block), the transaction is rollbacked and the abortHandler
is called:
tx.abortHandler(v -> {
System.out.println("Transaction failed => rollbacked");
});
Simplified transaction API
When you use a pool, you can start a transaction directly on the pool.
It borrows a connection from the pool, begins the transaction and releases the connection to the pool when the transaction ends.
pool.begin(res -> {
if (res.succeeded()) {
// Get the transaction
Transaction tx = res.result();
// Various statements
tx.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute(ar1 -> {
if (ar1.succeeded()) {
tx.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute(ar2 -> {
if (ar2.succeeded()) {
// Commit the transaction
// the connection will automatically return to the pool
tx.commit(ar3 -> {
if (ar3.succeeded()) {
System.out.println("Transaction succeeded");
} else {
System.out.println("Transaction failed " + ar3.cause().getMessage());
}
});
}
});
} else {
// No need to close connection as transaction will abort and be returned to the pool
}
});
}
});
注意
|
this code will not close the connection because it will always be released back to the pool when the transaction |
Cursors and streaming
By default prepared query execution fetches all rows, you can use a
Cursor
to control the amount of rows you want to read:
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of(18));
// Read 50 rows
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - close the cursor
cursor.close();
}
}
});
}
});
Cursors shall be closed when they are released prematurely:
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
A stream API is also available for cursors, which can be more convenient, specially with the Rxified version.
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
The stream read the rows by batch of 50
and stream them, when the rows have been passed to the handler,
a new batch of 50
is read and so on.
The stream can be resumed or paused, the loaded rows will remain in memory until they are delivered and the cursor will stop iterating.
MySQL type mapping
Currently the client supports the following MySQL types
-
BOOL,BOOLEAN (
java.lang.Byte
) -
TINYINT (
java.lang.Byte
) -
TINYINT UNSIGNED(
java.lang.Short
) -
SMALLINT (
java.lang.Short
) -
SMALLINT UNSIGNED(
java.lang.Integer
) -
MEDIUMINT (
java.lang.Integer
) -
MEDIUMINT UNSIGNED(
java.lang.Integer
) -
INT,INTEGER (
java.lang.Integer
) -
INTEGER UNSIGNED(
java.lang.Long
) -
BIGINT (
java.lang.Long
) -
BIGINT UNSIGNED(
io.vertx.sqlclient.data.Numeric
) -
FLOAT (
java.lang.Float
) -
FLOAT UNSIGNED(
java.lang.Float
) -
DOUBLE (
java.lang.Double
) -
DOUBLE UNSIGNED(
java.lang.Double
) -
BIT (
java.lang.Long
) -
NUMERIC (
io.vertx.sqlclient.data.Numeric
) -
NUMERIC UNSIGNED(
io.vertx.sqlclient.data.Numeric
) -
DATE (
java.time.LocalDate
) -
DATETIME (
java.time.LocalDateTime
) -
TIME (
java.time.Duration
) -
TIMESTAMP (
java.time.LocalDateTime
) -
YEAR (
java.lang.Short
) -
CHAR (
java.lang.String
) -
VARCHAR (
java.lang.String
) -
BINARY (
io.vertx.core.buffer.Buffer
) -
VARBINARY (
io.vertx.core.buffer.Buffer
) -
TINYBLOB (
io.vertx.core.buffer.Buffer
) -
TINYTEXT (
java.lang.String
) -
BLOB (
io.vertx.core.buffer.Buffer
) -
TEXT (
java.lang.String
) -
MEDIUMBLOB (
io.vertx.core.buffer.Buffer
) -
MEDIUMTEXT (
java.lang.String
) -
LONGBLOB (
io.vertx.core.buffer.Buffer
) -
LONGTEXT (
java.lang.String
) -
ENUM (
java.lang.String
) -
SET (
java.lang.String
) -
JSON (
io.vertx.core.json.JsonObject
,io.vertx.core.json.JsonArray
,Number
,Boolean
,String
,io.vertx.sqlclient.Tuple#JSON_NULL
)
Tuple decoding uses the above types when storing values
Note: In Java there is no specific representations for unsigned numeric values, so this client will convert an unsigned value to the correlated Java type.
Implicit type conversion
The Reactive MySQL Client supports implicit type conversions when executing a prepared statement.
Suppose you have a TIME
column in your table, the two examples below will both work here.
client
.preparedQuery("SELECT * FROM students WHERE updated_time = ?")
.execute(Tuple.of(LocalTime.of(19, 10, 25)), ar -> {
// handle the results
});
// this will also work with implicit type conversion
client
.preparedQuery("SELECT * FROM students WHERE updated_time = ?")
.execute(Tuple.of("19:10:25"), ar -> {
// handle the results
});
The MySQL data type for encoding will be inferred from the parameter values and here is the type mapping
Parameter value type | encoding MySQL type |
---|---|
null |
MYSQL_TYPE_NULL |
java.lang.Byte |
MYSQL_TYPE_TINY |
java.lang.Boolean |
MYSQL_TYPE_TINY |
java.lang.Short |
MYSQL_TYPE_SHORT |
java.lang.Integer |
MYSQL_TYPE_LONG |
java.lang.Long |
MYSQL_TYPE_LONGLONG |
java.lang.Double |
MYSQL_TYPE_DOUBLE |
java.lang.Float |
MYSQL_TYPE_FLOAT |
java.time.LocalDate |
MYSQL_TYPE_DATE |
java.time.Duration |
MYSQL_TYPE_TIME |
java.time.LocalTime |
MYSQL_TYPE_TIME |
io.vertx.core.buffer.Buffer |
MYSQL_TYPE_BLOB |
java.time.LocalDateTime |
MYSQL_TYPE_DATETIME |
default |
MYSQL_TYPE_STRING |
Handling BOOLEAN
In MySQL BOOLEAN
and BOOL
data types are synonyms for TINYINT(1)
. A value of zero is considered false, non-zero values are considered true.
A BOOLEAN
data type value is stored in Row
or Tuple
as java.lang.Byte
type, you can call Row#getValue
to retrieve it as a java.lang.Byte
value,
or you can call Row#getBoolean
to retrieve it as java.lang.Boolean
value.
client
.query("SELECT graduated FROM students WHERE id = 0")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rowSet = ar.result();
for (Row row : rowSet) {
int pos = row.getColumnIndex("graduated");
Byte value = row.get(Byte.class, pos);
Boolean graduated = row.getBoolean("graduated");
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
When you want to execute a prepared statement with a param of a BOOLEAN
value, you can simply add the java.lang.Boolean
value to the params list.
client
.preparedQuery("UPDATE students SET graduated = ? WHERE id = 0")
.execute(Tuple.of(true), ar -> {
if (ar.succeeded()) {
System.out.println("Updated with the boolean value");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Handling JSON
MySQL JSON
data type is represented by the following Java types:
-
String
-
Number
-
Boolean
-
io.vertx.core.json.JsonObject
-
io.vertx.core.json.JsonArray
-
io.vertx.sqlclient.Tuple#JSON_NULL
for representing the JSON null literal
Tuple tuple = Tuple.of(
Tuple.JSON_NULL,
new JsonObject().put("foo", "bar"),
3);
// Retrieving json
Object value = tuple.getValue(0); // Expect JSON_NULL
//
value = tuple.get(JsonObject.class, 1); // Expect JSON object
//
value = tuple.get(Integer.class, 2); // Expect 3
value = tuple.getInteger(2); // Expect 3
Handling BIT
The BIT
data type is mapped to java.lang.Long
type, but Java has no notion of unsigned numeric values, so if you want to insert or update a record with the max value of BIT(64)
, you can do some tricks setting the parameter to -1L
.
Handling TIME
MySQL TIME
data type can be used to represent either time of a day or a time interval which ranges from -838:59:59
to 838:59:59
. In Reactive MySQL client the TIME
data type is mapped to java.time.Duration
natively, but you can also retrieve it as a java.time.LocalTime
via Row#getLocalTime
accessor.
Handling NUMERIC
The Numeric
Java type is used to represent the MySQL NUMERIC
type.
Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
// Handle NaN
} else {
BigDecimal value = numeric.bigDecimalValue();
}
Collector queries
You can use Java collectors with the query API:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// Get the map created by the collector
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
The collector processing must not keep a reference on the Row
as
there is a single row used for processing the entire set.
The Java Collectors
provides many interesting predefined collectors, for example you can
create easily create a string directly from the row set:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
MySQL Stored Procedure
You can run stored procedures in queries. The result will be retrieved from the server following the MySQL protocol without any magic here.
client.query("CREATE PROCEDURE multi() BEGIN\n" +
" SELECT 1;\n" +
" SELECT 1;\n" +
" INSERT INTO ins VALUES (1);\n" +
" INSERT INTO ins VALUES (2);\n" +
"END;").execute(ar1 -> {
if (ar1.succeeded()) {
// create stored procedure success
client
.query("CALL multi();")
.execute(ar2 -> {
if (ar2.succeeded()) {
// handle the result
RowSet<Row> result1 = ar2.result();
Row row1 = result1.iterator().next();
System.out.println("First result: " + row1.getInteger(0));
RowSet<Row> result2 = result1.next();
Row row2 = result2.iterator().next();
System.out.println("Second result: " + row2.getInteger(0));
RowSet<Row> result3 = result2.next();
System.out.println("Affected rows: " + result3.rowCount());
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar1.cause().getMessage());
}
});
Note: Prepared statements binding OUT parameters is not supported for now.
MySQL LOCAL INFILE
This client supports for handling the LOCAL INFILE Request, if you want to load data from a local file into the server, you can use query
LOAD DATA LOCAL INFILE '<filename>' INTO TABLE <table>;
. More information can be found in the MySQL Reference Manual.
Authentication
Default authentication plugin
This client supports for specifying the default authentication plugin to use at the connection start. Currently the following plugins are supported:
-
mysql_native_password
-
caching_sha2_password
-
mysql_clear_password
MySQLConnectOptions options = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setAuthenticationPlugin(MySQLAuthenticationPlugin.MYSQL_NATIVE_PASSWORD); // set the default authentication plugin
New authentication method introduced in MySQL 8
MySQL 8.0 introduces a new authentication method named caching_sha2_password
and it’s the default one to authenticate.
In order to connect to the server using this new authentication method, you need either use a secure connection(i.e. enable TLS/SSL) or exchange the encrypted password using an RSA key pair to avoid leaks of password. The RSA key pair is automatically exchanged during the communication, but the server RSA public key may be hacked during the process since it’s transferred on a insecure connection.
So if you’re on a insecure connection and want to avoid the risk of exposing the server RSA public key, you can set the server RSA public key like this:
MySQLConnectOptions options1 = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setServerRsaPublicKeyPath("tls/files/public_key.pem"); // configure with path of the public key
MySQLConnectOptions options2 = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setServerRsaPublicKeyValue(Buffer.buffer("-----BEGIN PUBLIC KEY-----\n" +
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3yvG5s0qrV7jxVlp0sMj\n" +
"xP0a6BuLKCMjb0o88hDsJ3xz7PpHNKazuEAfPxiRFVAV3edqfSiXoQw+lJf4haEG\n" +
"HQe12Nfhs+UhcAeTKXRlZP/JNmI+BGoBduQ1rCId9bKYbXn4pvyS/a1ft7SwFkhx\n" +
"aogCur7iIB0WUWvwkQ0fEj/Mlhw93lLVyx7hcGFq4FOAKFYr3A0xrHP1IdgnD8QZ\n" +
"0fUbgGLWWLOossKrbUP5HWko1ghLPIbfmU6o890oj1ZWQewj1Rs9Er92/UDj/JXx\n" +
"7ha1P+ZOgPBlV037KDQMS6cUh9vTablEHsMLhDZanymXzzjBkL+wH/b9cdL16LkQ\n" +
"5QIDAQAB\n" +
"-----END PUBLIC KEY-----\n")); // configure with buffer of the public key
More information about the caching_sha2_password
authentication method can be found in the MySQL Reference Manual.
Using SSL/TLS
To configure the client to use SSL connection, you can configure the MySQLConnectOptions
like a Vert.x NetClient
.
All SSL modes are supported and you are able to configure sslmode
. The client is in DISABLED
SSL mode by default.
ssl
parameter is kept as a mere shortcut for setting sslmode
. setSsl(true)
is equivalent to setSslMode(VERIFY_CA)
and setSsl(false)
is equivalent to setSslMode(DISABLED)
.
MySQLConnectOptions options = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setSslMode(SslMode.VERIFY_CA)
.setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem"));
MySQLConnection.connect(vertx, options, res -> {
if (res.succeeded()) {
// Connected with SSL
} else {
System.out.println("Could not connect " + res.cause());
}
});
More information can be found in the Vert.x documentation.
MySQL utility command
Sometimes you want to use MySQL utility commands and we provide support for this. More information can be found in the MySQL utility commands.
COM_PING
You can use COM_PING
command to check if the server is alive. The handler will be notified if the server responds to the PING, otherwise the handler will never be called.
connection.ping(ar -> {
System.out.println("The server has responded to the PING");
});
COM_RESET_CONNECTION
You can reset the session state with COM_RESET_CONNECTION
command, this will reset the connection state like:
- user variables
- temporary tables
- prepared statements
connection.resetConnection(ar -> {
if (ar.succeeded()) {
System.out.println("Connection has been reset now");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_CHANGE_USER
You can change the user of the current connection, this will perform a re-authentication and reset the connection state like COM_RESET_CONNECTION
.
MySQLAuthOptions authenticationOptions = new MySQLAuthOptions()
.setUser("newuser")
.setPassword("newpassword")
.setDatabase("newdatabase");
connection.changeUser(authenticationOptions, ar -> {
if (ar.succeeded()) {
System.out.println("User of current connection has been changed.");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_INIT_DB
You can use COM_INIT_DB
command to change the default schema of the connection.
connection.specifySchema("newschema", ar -> {
if (ar.succeeded()) {
System.out.println("Default schema changed to newschema");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_STATISTICS
You can use COM_STATISTICS
command to get a human readable string of some internal status variables in MySQL server.
connection.getInternalStatistics(ar -> {
if (ar.succeeded()) {
System.out.println("Statistics: " + ar.result());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_DEBUG
You can use COM_DEBUG
command to dump debug info to the MySQL server’s STDOUT.
connection.debug(ar -> {
if (ar.succeeded()) {
System.out.println("Debug info dumped to server's STDOUT");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_SET_OPTION
You can use COM_SET_OPTION
command to set options for the current connection. Currently only CLIENT_MULTI_STATEMENTS
can be set.
For example, you can disable CLIENT_MULTI_STATEMENTS
with this command.
connection.setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF, ar -> {
if (ar.succeeded()) {
System.out.println("CLIENT_MULTI_STATEMENTS is off now");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
MySQL and MariaDB version support matrix
MySQL | MariaDB | ||
---|---|---|---|
Version |
Supported |
Version |
Supported |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
Known issues:
-
Reset connection utility command does not work in MySQL 5.5, 5.6 and MariaDB 10.1
-
Change user utility command is not supported with MariaDB 10.2 and 10.3
Pitfalls & Good Practices
Here are some good practices for you to avoid common pitfalls when using the Reactive MySQL Client.
prepared statement count limit
Sometimes you might meet the notorious error Can’t create more than max_prepared_stmt_count statements (current value: 16382)
, this is because the server has reached the limit of total number of prepared statement.
You can adjust the server system variable max_prepared_stmt_count
but it has an upper bound value so you can’t get rid of the error in this way.
The best way to alleviate this is enabling prepared statement caching, so the prepared statements with the same SQL string could be reused and the client does not have to create a brand new prepared statement for every request. The prepared statement will be automatically closed after the statement is executed. In this way the chances of reaching the limit could be greatly reduced though it could not be totally eliminated.
You can also manage the lifecycle of prepared statements manually by creating a PreparedStatement
object via SqlConnection#prepare
interface so that you can choose when to deallocate the statement handle, or even use the SQL syntax prepared statement.
demystifying prepared batch
There is time when you want to batch insert data into the database, you can use PreparedQuery#executeBatch
which provides a simple API to handle this.
Keep in mind that MySQL does not natively support batching protocol so the API is only a sugar by executing the prepared statement one after another, which means more network round trips are required comparing to inserting multiple rows by executing one prepared statement with a list of values.
tricky DATE & TIME data types
Handling MYSQL DATE and TIME data types especially with time zones is tricky therefore the Reactive MySQL Client does no magic transformation for those values.
-
MySQL DATETIME data type does not contain time zone info, so what you get is identical to what you set no matter what time zone is in the current session.
-
MySQL TIMESTAMP data type contains time zone info, so when you set or get the value it’s always transformed by the server with the timezone set in the current session.