coil 的几种用法:
- 在方法上调用enqueue(conn),其中 conn 是在 smol::block_on() 中创建的:
#[coil::background_job]
fn check_arg_equal_to_env(env: &String, arg: String) -> Result<(), PerformError> {
if env == &arg {
Ok(())
} else {
Err("arg wasn't env!".into())
}
}
let (runner, rx) = TestGuard::runner("a".to_string(), 2);
smol::block_on(async {
let mut conn = runner.connection_pool().acquire().await.unwrap();
check_arg_equal_to_env("a".into())
.enqueue(&mut conn)
.await
.unwrap();
check_arg_equal_to_env("b".into())
.enqueue(&mut conn)
.await
.unwrap();
runner.run_all_sync_tasks().await.unwrap();
});
- 方法可以带上 conn 作为参数,不过 conn 仍然还是在 smol::block_on() 里创建的
#[coil::background_job]
fn takes_env_and_conn_sync(_env: &(), pool: &PgPool) -> Result<(), coil::PerformError> {
smol::block_on(sqlx::query("SELECT 1").execute(pool))?;
Ok(())
}
let (runner, rx) = TestGuard::dummy_runner();
smol::block_on(async {
let mut conn = runner.connection_pool().acquire().await.unwrap();
takes_env_and_conn_sync().enqueue(&mut conn).await.unwrap();
runner.run_all_async_tasks().await.unwrap();
});
- 先 register_job 注册 Job,后续需要执行的时候,才需要调用方法的enqueue(conn) 开始执行:
#[coil::background_job]
fn can_specify_where_clause<S>(_eng: &(), arg: S) -> Result<(), coil::PerformError>
where
S: Serialize + DeserializeOwned + std::fmt::Display,
{
arg.to_string();
Ok(())
}
let (tx, rx) = channel::bounded(1);
let runner = TestGuard::builder(())
.register_job::<can_specify_where_clause::Job<String>>()
.on_finish(move |_| {
let _ = smol::block_on(tx.send(coil::Event::Dummy)).unwrap();
})
.build();
smol::block_on(async {
let mut conn = runner.connection_pool().acquire().await.unwrap();
can_specify_where_clause("hello".to_string())
.enqueue(&mut conn)
.await
.unwrap();
runner.run_all_sync_tasks().await.unwrap();
runner.check_for_failed_jobs(rx, 1).await.unwrap();
});
substrate archive 的框架图: archive 中 actors 是管理所有 Actor 的入口,其中 Runner 注册了 execute_block() 这个 Job: execute_block() 是真正的业务方法,这个方法接收一个 Env,以及一个区块 block,最终向 StorageActor 发送 Storage 消息: storage 的构建,是从 Block 转换为 Storage的,这里会调用 api.execute_block(id,block) 方法执行区块。
- api.execute_block() 类似于 node runtime 里 impl_runtime_apis! 对 Core 的 execute_block() 调用
- api 的 execute_block() 属于 Runtime Api,一旦调用了 Runtime Api,即进入了 Runtime 层
- 执行完区块后,会产生 storage 的变更信息(storage changes),这样就将 Block 转为了 BlockChange
- 这里的 Block 是 primitives/runtime 定义的 trait Block,而 BlockChange 是 archive 自定义的结构体
在看代码时,如果出现了 BlockT,表示的是 substrate primitives/runtime 下的 Block trait。
let storage = {
// storage scope
let handler = env
.tracing_targets
.as_ref()
.map(|t| TraceHandler::new(&t, number.into(), hash.as_ref().to_vec(), span_events.clone()));
let _guard = handler.map(tracing::subscriber::set_default);
let now = std::time::Instant::now();
let block = BlockExecutor::new(api, &env.backend, block).block_into_storage()?;
log::debug!("Took {:?} to execute block", now.elapsed());
Storage::from(block)
};
上面 actors.rs 的 main_loop() 中只是注册了 execute_block() 这个 Job。真正的执行流程:
- Crawl 从 secondly db(rocksdb)读取到一个新的块后,插入到 PostgreSQL DB
- PostgreSQL DB 定义了一个触发器,当 blocks 表有新数据插入时,触发监听器的调用
- 触发器传递给监听器的内容为:table、action、id,即哪张表的哪行记录出现了哪种类型的操作(比如插入)
- 在 main_loop() 的最开始,初始化了监听器的执行逻辑,接受的参数为 Notif、Connection
下面代码表示只关注 Blocks 这个 Channel,对应的值为“blocks_update”。监听器的逻辑为 move 里的内容。
- 根据 id 查询出指定的 block,然后进行反序列化
- 调用之前定义的 execute_job() 这个异步作业,调用其 enqueue(),传入 conn
- 因为在 main_loop() 里调用了 runner.run_all_sync_tasks(),所以只要把作业放入队列中即可被运行
async fn init_listeners(pg_url: &str) -> Result<Listener> {
Listener::builder(pg_url, move |notif, conn| {
async move {
let block = queries::get_full_block_by_id(conn, notif.id).await?;
let b: (B, u32) = BlockModelDecoder::with_single(block)?;
crate::tasks::execute_block::<B, R, C, D>(b.0, PhantomData).enqueue(conn).await?;
Ok(())
}
.boxed()
})
.listen_on(Channel::Blocks)
.spawn()
.await
}
具体监听器如何调用到自定义的函数在 listener.rs 中。
其他知识点:
- postrage 的触发器与 pg_notify() 函数,对应的库用的是 sqlx
- 异步任务框架 coil
- actor 框架 xtra
1
共 1 条评论, 1 页
评论区
写评论讲讲背景知识?