async_thread_pool.rb

lib/sequel/extensions/async_thread_pool.rb
Last Update: 2024-02-07 11:09:08 -0800

The async_thread_pool extension adds support for running database queries in a separate threads using a thread pool. With the following code

DB.extension :async_thread_pool
foos = DB[:foos].async.where(name: 'A'..'M').all
bar_names = DB[:bar].async.select_order_map(:name)
baz_1 = DB[:bazes].async.first(id: 1)

All 3 queries will be run in separate threads. foos, bar_names and baz_1 will be proxy objects. Calling a method on the proxy object will wait for the query to be run, and will return the result of calling that method on the result of the query method. For example, if you run:

foos = DB[:foos].async.where(name: 'A'..'M').all
bar_names = DB[:bars].async.select_order_map(:name)
baz_1 = DB[:bazes].async.first(id: 1)
sleep(1)
foos.size
bar_names.first
baz_1.name

These three queries will generally be run concurrently in separate threads. If you instead run:

DB[:foos].async.where(name: 'A'..'M').all.size
DB[:bars].async.select_order_map(:name).first
DB[:bazes].async.first(id: 1).name

Then will run each query sequentially, since you need the result of one query before running the next query. The queries will still be run in separate threads (by default).

What is run in the separate thread is the entire method call that returns results. So with the original example:

foos = DB[:foos].async.where(name: 'A'..'M').all
bar_names = DB[:bars].async.select_order_map(:name)
baz_1 = DB[:bazes].async.first(id: 1)

The all, select_order_map(:name), and first(id: 1) calls are run in separate threads. If a block is passed to a method such as all or each, the block is also run in that thread. If you have code such as:

h = {}
DB[:foos].async.each{|row| h[row[:id]] = row}
bar_names = DB[:bars].async.select_order_map(:name)
p h

You may end up with it printing an empty hash or partial hash, because the async each call will not have run or finished running. Since the p h code relies on a side-effect of the each block and not the return value of the each call, it will not wait for the loading.

You should avoid using async for any queries where you are ignoring the return value, as otherwise you have no way to wait for the query to be run.

Datasets that use async will use async threads to load data for the majority of methods that can return data. However, dataset methods that return enumerators will not use an async thread (e.g. calling # Dataset#map without a block or arguments does not use an async thread or return a proxy object).

Because async methods (including their blocks) run in a separate thread, you should not use control flow modifiers such as return or break in async queries. Doing so will result in a error.

Because async results are returned as proxy objects, it’s a bad idea to use them in a boolean setting:

result = DB[:foo].async.get(:boolean_column)
# or:
result = DB[:foo].async.first

# ...
if result 
  # will always execute this banch, since result is a proxy object
end

In this case, you can call the __value method to return the actual result:

if result.__value
  # will not execute this branch if the dataset method returned nil or false
end

Similarly, because a proxy object is used, you should be careful using the result in a case statement or an argument to Class#===:

# ...
case result
when Hash, true, false
  # will never take this branch, since result is a proxy object
end

Similar to usage in an if statement, you should use __value:

case result.__value
when Hash, true, false
  # will never take this branch, since result is a proxy object
end

On Ruby 2.2+, you can use itself instead of __value. It’s preferable to use itself if you can, as that will allow code to work with both proxy objects and regular objects.

Because separate threads and connections are used for async queries, they do not use any state on the current connection/thread. So if you do:

DB.transaction{DB[:table].async.all}

Be aware that the transaction runs on one connection, and the SELECT query on a different connection. If you use currently using transactional testing (running each test inside a transaction/savepoint), and want to start using this extension, you should first switch to non-transactional testing of the code that will use the async thread pool before using this extension, as otherwise the use of Dataset#async will likely break your tests.

If you are using Database#synchronize to checkout a connection, the same issue applies, where the async query runs on a different connection:

DB.synchronize{DB[:table].async.all}

Similarly, if you are using the server_block extension, any async queries inside with_server blocks will not use the server specified:

DB.with_server(:shard1) do
  DB[:a].all # Uses shard1
  DB[:a].async.all # Uses default shard
end

You need to manually specify the shard for any dataset using an async query:

DB.with_server(:shard1) do
  DB[:a].all # Uses shard1
  DB[:a].async.server(:shard1).all # Uses shard1
end

When the async_thread_pool extension, the size of the async thread pool can be set by using the :num_async_threads Database option, which must be set before loading the async_thread_pool extension. This defaults to the size of the Database object’s connection pool.

By default, for consistent behavior, the async_thread_pool extension will always run the query in a separate thread. However, in some cases, such as when the async thread pool is busy and the results of a query are needed right away, it can improve performance to allow preemption, so that the query will run in the current thread instead of waiting for an async thread to become available. With the following code:

foos = DB[:foos].async.where(name: 'A'..'M').all
bar_names = DB[:bar].async.select_order_map(:name)
if foos.length > 4
  baz_1 = DB[:bazes].async.first(id: 1)
end

Whether you need the baz_1 variable depends on the value of foos. If the async thread pool is busy, and by the time the foos.length call is made, the async thread pool has not started the processing to get the foos value, it can improve performance to start that processing in the current thread, since it is needed immediately to determine whether to schedule query to get the baz_1 variable. The default is to not allow preemption, because if the current thread is used, it may have already checked out a connection that could be used, and that connection could be inside a transaction or have some other manner of connection-specific state applied to it. If you want to allow preemption, you can set the :preempt_async_thread Database option before loading the async_thread_pool extension.

Note that the async_thread_pool extension creates the thread pool when it is loaded into the Database. If you fork after loading the extension, the extension will not work, as fork does not copy the thread pools. If you are using a forking webserver (or any other system that forks worker processes), load this extension in each child process, do not load it before forking.

Related module: Sequel::Database::AsyncThreadPool::DatasetMethods