< 返回版块

zqhxuyuan 发表于 2021-03-30 13:34

Tags:coil

coil 的几种用法:

  • 在方法上调用enqueue(conn),其中 conn 是在 smol::block_on() 中创建的:
    #[coil::background_job]
    fn check_arg_equal_to_env(env: &String, arg: String) -> Result<(), PerformError> {
        if env == &arg {
            Ok(())
        } else {
            Err("arg wasn't env!".into())
        }
    }

    let (runner, rx) = TestGuard::runner("a".to_string(), 2);
    smol::block_on(async {
        let mut conn = runner.connection_pool().acquire().await.unwrap();
        check_arg_equal_to_env("a".into())
            .enqueue(&mut conn)
            .await
            .unwrap();
        check_arg_equal_to_env("b".into())
            .enqueue(&mut conn)
            .await
            .unwrap();
        runner.run_all_sync_tasks().await.unwrap();
    });
  • 方法可以带上 conn 作为参数,不过 conn 仍然还是在 smol::block_on() 里创建的
    #[coil::background_job]
    fn takes_env_and_conn_sync(_env: &(), pool: &PgPool) -> Result<(), coil::PerformError> {
        smol::block_on(sqlx::query("SELECT 1").execute(pool))?;
        Ok(())
    }

    let (runner, rx) = TestGuard::dummy_runner();
    smol::block_on(async {
        let mut conn = runner.connection_pool().acquire().await.unwrap();
        takes_env_and_conn_sync().enqueue(&mut conn).await.unwrap();
        runner.run_all_async_tasks().await.unwrap();
    });
  • 先 register_job 注册 Job,后续需要执行的时候,才需要调用方法的enqueue(conn) 开始执行:
	#[coil::background_job]
    fn can_specify_where_clause<S>(_eng: &(), arg: S) -> Result<(), coil::PerformError>
    where
        S: Serialize + DeserializeOwned + std::fmt::Display,
    {
        arg.to_string();
        Ok(())
    }

    let (tx, rx) = channel::bounded(1);
    let runner = TestGuard::builder(())
        .register_job::<can_specify_where_clause::Job<String>>()
        .on_finish(move |_| {
            let _ = smol::block_on(tx.send(coil::Event::Dummy)).unwrap();
        })
        .build();

    smol::block_on(async {
        let mut conn = runner.connection_pool().acquire().await.unwrap();
        can_specify_where_clause("hello".to_string())
            .enqueue(&mut conn)
            .await
            .unwrap();

        runner.run_all_sync_tasks().await.unwrap();
        runner.check_for_failed_jobs(rx, 1).await.unwrap();
    });

substrate archive 的框架图: image.png archive 中 actors 是管理所有 Actor 的入口,其中 Runner 注册了 execute_block() 这个 Job: image.png execute_block() 是真正的业务方法,这个方法接收一个 Env,以及一个区块 block,最终向 StorageActor 发送 Storage 消息: image.png storage 的构建,是从 Block 转换为 Storage的,这里会调用 api.execute_block(id,block) 方法执行区块。

  • api.execute_block() 类似于 node runtime 里 impl_runtime_apis! 对 Core 的 execute_block() 调用
  • api 的 execute_block() 属于 Runtime Api,一旦调用了 Runtime Api,即进入了 Runtime 层
  • 执行完区块后,会产生 storage 的变更信息(storage changes),这样就将 Block 转为了 BlockChange
  • 这里的 Block 是 primitives/runtime 定义的 trait Block,而 BlockChange 是 archive 自定义的结构体

在看代码时,如果出现了 BlockT,表示的是 substrate primitives/runtime 下的 Block trait。

	let storage = {
		// storage scope
		let handler = env
			.tracing_targets
			.as_ref()
			.map(|t| TraceHandler::new(&t, number.into(), hash.as_ref().to_vec(), span_events.clone()));

		let _guard = handler.map(tracing::subscriber::set_default);

		let now = std::time::Instant::now();
		let block = BlockExecutor::new(api, &env.backend, block).block_into_storage()?;
		log::debug!("Took {:?} to execute block", now.elapsed());

		Storage::from(block)
	};

上面 actors.rs 的 main_loop() 中只是注册了 execute_block() 这个 Job。真正的执行流程:

  • Crawl 从 secondly db(rocksdb)读取到一个新的块后,插入到 PostgreSQL DB
  • PostgreSQL DB 定义了一个触发器,当 blocks 表有新数据插入时,触发监听器的调用
  • 触发器传递给监听器的内容为:table、action、id,即哪张表的哪行记录出现了哪种类型的操作(比如插入)
  • 在 main_loop() 的最开始,初始化了监听器的执行逻辑,接受的参数为 Notif、Connection

下面代码表示只关注 Blocks 这个 Channel,对应的值为“blocks_update”。监听器的逻辑为 move 里的内容。

  • 根据 id 查询出指定的 block,然后进行反序列化
  • 调用之前定义的 execute_job() 这个异步作业,调用其 enqueue(),传入 conn
  • 因为在 main_loop() 里调用了 runner.run_all_sync_tasks(),所以只要把作业放入队列中即可被运行
	async fn init_listeners(pg_url: &str) -> Result<Listener> {
		Listener::builder(pg_url, move |notif, conn| {
			async move {
				let block = queries::get_full_block_by_id(conn, notif.id).await?;
				let b: (B, u32) = BlockModelDecoder::with_single(block)?;
				crate::tasks::execute_block::<B, R, C, D>(b.0, PhantomData).enqueue(conn).await?;
				Ok(())
			}
			.boxed()
		})
		.listen_on(Channel::Blocks)
		.spawn()
		.await
	}

具体监听器如何调用到自定义的函数在 listener.rs 中。

其他知识点:

  • postrage 的触发器与 pg_notify() 函数,对应的库用的是 sqlx
  • 异步任务框架 coil
  • actor 框架 xtra

评论区

写评论
Mike Tang 2021-03-30 13:56

讲讲背景知识?

1 共 1 条评论, 1 页