Concurrencia en Rust

Tanto concurrencia como paralelismo, no a nivel computación paralela masiva, pero sí acerca de poder lanzar futuros/computaciones asíncronas y que estas se ejecuten de forma paralela en distintos hilos, todo ello gestionado normalmente por el runtime.

Select

Puede ser que queramos ejecutar dos o más tareas y que, cuando una de las dos se complete, terminemos el bloque asíncrono en que se ejecutaban. Para eso se usa la construcción Select. En el caso de tokio, su uso es muy simple con la macro tokio::select!. En este artículo se usa de forma muy sencilla y clara para poder parar un servidor implementado con axum.

En la documentación tenemos el siguiente ejemplo, también muy claro.

  use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        let _ = tx1.send("one");
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

No estamos limitados a dos futuros, sino que podemos controlar hasta 64 en el mismo cuerpo de la macro.

Future

La construcción básica para representar una computación asíncrona es el trait future.

pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Cuando se le pregunta, poll responde informando de si ya ha terminado y podemos extraer el valor que ha calculado o no. A este acto se le llama, como en JavaScript, como casi siempre, resolving.

Pin en la firma de poll

Los tipos de Rust pueden clasificarse en dos grupos:

  • seguros para moverse en memoria, lo habitual
  • tipos que no es seguro mover a otras direcciones de memoria

Como se ve en el código de Future, en fn poll(self: Pin<&mut Self>..., la propia instancia ha de ser Pin. Pin hace que sea posible garantizar que un objeto no va a ubicarse en otra dirección de memoria, y simpre se podrá acceder con la inicial que se le asignó cuando se instanció.

Los tipos con auto-referencias no son seguros de mover. Esto no tiene relación estricta con programación concurrente y async/await, pero al final es probablemente donde más afecta este struct.

Cuando tenemos un bloque que genera un futuro, como una función async, o un bloque dentro de tokio::spawn, como por ejemplo esto:

async fn foo() {
    let mut x = [1, 2];
    let y = &mut x;
    bar(y).await; // async fn bar(mut x: &[i32]) { ... }
}

el compilador genera algo parecido a

enum State {
    Waiting, Done // cualquier otro paso que el compilador considere
}

struct FooFuture {
    x: [i32; 3],
    y: *mut [i32, 3],
    state: FooState,
}

Si por ejemplo hacemos una llamada a foo.poll(), el futuro se empezará a ejecutar en el hilo que lo crea. Si volvemos a llamar a foo.poll(), porque tras la primera llamada el futuro aún no está resuelto, entre llamadas puede ser que el runtime haya movido el futuro entre hilos. Su posición de memoria entonces cambiará. Pero y seguirá apuntando a la dirección original de x.

   ORIGINAL
   memoria   instancia
   0x10000   FooFuture { x: [1,2], y: 0x10000, ... }
     \__________________________________|   auto-referencia

   TRAS MOVER
   memoria   instancia
-> 0x10000   memoria libre
|  0x14024   FooFuture { x: [1,2], y: 0x10000, ... }
|________________________________________|   apunta a instancia original

Cuando usamos Pin, este envuelve aquello que queremos impedir que se mueva de forma que, en efecto, siempre estará disponible en la misma dirección. En el ejemplo anterior estaría tras mover también en 0x10000..

Pin envuelve siempre un tipo que puede ser derreferenciado a otro tipo: si envolviese un tipo T arbitrario, podría ser que moviendo el propio Pin se moviese el valor subyacente. Al obligar a usar &mut T, Box<T> o Rc<T>, aunque se mueva el struct Pin, se moverá la referencia del contenedor, no los valores internos, con lo que las auto-referencias guardadas originalmente seguirán siendo válidas.1

Aunque usemos referencias compartidas, o llamemos directamente a &mut x sin guardar dicha referencia en otra variable extra (y en nuestro ejemplo), no cambia nada. Primero, como vimos poll obliga a que la instancia de futuro sea pineada. Segundo, el compilador puede hacer alguna optimización o cambio que sea factible si está seguro que la referencia no cambia. Tercero, esto ya opinión personal, por claridad y congruencia es mejor mantener la misma semántica para todos los casos, y no permitir ciertas casos en unos y otras en otros. Sería más flexible, pero no aportaría mucha ventaja tener dicha flexibilidad, al menos hasta lo que yo sé.

Caso práctico

Todo este estudio de Pin (y Send como se explicará después) se ha hecho por una recomendación del compilador/analizador. Ante este trait

pub trait InsertLog {
    async fn insert_log(&self, request: &ClotrRequest) -> Result<(), String>;
}

rust-analyzer hizo la siguiente observación:

use of `async fn` in public traits is discouraged as auto trait bounds cannot be specified you can suppress this lint if you plan to use the trait only in your own code, or do not care about auto traits like `Send` on the `Future` `#[warn(async_fn_in_trait)]` on by defaultrustcClick for full compiler diagnostic lib.rs(18, 5): you can alternatively desugar to a normal `fn` that returns `impl Future` and add any desired bounds such as `Send`, but these cannot be relaxed without a breaking API change: `impl std::future::Future<Output = `, `> + Send`

No dice que esté mal per-se. Pero recomienda, sobre todo en caso de librerías con API pública, que se especifiquen los auto-traits para facilitar su uso en clientes que exigen que dichos traits se implementen. Y como los bloques que definen futuros no definen cuáles de estos auto-traits implementa el tipo devuelto, de ahí que en este caso se haga esta recomendación.

Devolvemos futuro

La firma del método que recomienda rust-analyzer es ligeramente más complicada, pero se entiende bien:

pub trait InsertLog {
    fn insert_log(&self, request: &Request) -> impl Future<Output = Result<(), String>> + Send;
    async fn insert_log_async(&self, request: &Request) -> Result<(), String>;
}

Devolvemos un futuro, especificando cuál es el tipo de retorno y que en efecto queremos que se el futuro se pueda mover entre hilos gracias a Send.

  • Por qué necesitamos Send

    Send le marca al compilador que un futuro puede moverse entre hilos. Los futuros no lo implementan por defecto, porque dependen de los tipos que capturen, por lo que hay que explicitarlo. Si no capturamos nada sí son Send, si capturamos algo que es Send como String, tipos primitivos, etc, también es Send. Pero si capturamos algo que no es Send, porque no es seguro llamarlo de forma concurrente, no será Send. Por ejemplo

    let r = Rc::new(String::from("foo"));
    tokio::spawn(async move {
        println!("{r:?}");
    });
    

    no es Send porque Rc no es seguro para ejecutar en aplicaciones multi-hilo. Habría que cambiar y en este caso usar Arc.

    Tockio ejecuta de inicio el futuro en el hilo que lo creo. Si no especificamos Send, o no puede ser como vimos antes si capturamos un Rc, seguirá ejecutándolo en él. De esta forma, tenemos algo similar a NodeJS.

Devolvemos futuro dentro de Pin

En el caso anterior no había rastro de Pin2. Pero investigando un poco más llegamos a la solución con Pin

pub trait InsertLog {
  fn insert_log_pinned(&self, request: &Request) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
  fn insert_log(&self, request: &Request) -> impl Future<Output = Result<(), String>> + Send;
  async fn insert_log_async(&self, request: &Request) -> Result<(), String>;
}

Aquí sí se marca que vamos a devolver un tipo pineado que no va a poder moverse a otra dirección de memoria que no sea la asignada originalmente. Así, pasamos de la implementación original con async3

async fn insert_log_async(pool: Pool, log: Request) -> Result<(), String> {
    let mut client = pool.get_handle().await.map_err(|e| format!("{e:?}"))?;

    let block = serde_json::to_string(&log.attributes)
        .map_err(|e| format!("Failed to serialize attributes: {e:?}"))
        .map(|s| {
            Block::new()
                .column("timestamp", vec![log.timestamp])
                .column("level", vec![log.level])
                .column("service_name", vec![log.service_name])
                .column("message", vec![log.message])
                .column("environment", vec![log.environment])
                .column("trace_id", vec![log.trace_id])
                .column("span_id", vec![log.span_id])
                .column("attributes", vec![s])
        })?;

    client
        .insert("logs", block)
        .await
        .map_err(|e| format!("{e:?}"))
}

a la función que devuelve el futuro pero no lo ejecuta, por lo que no necesita ser async

pub fn insert_log(
    pool: Pool,
    log: ClotrRequest,
) -> impl Future<Output = Result<(), String>> + Send {
    async move {
        let mut client = pool.get_handle().await.map_err(|e| format!("{e:?}"))?;

        let block = serde_json::to_string(&log.attributes)
            .map_err(|e| format!("Failed to serialize attributes: {e:?}"))
            .map(|s| {
                Block::new()
                    .column("timestamp", vec![log.timestamp])
                    .column("level", vec![log.level])
                    .column("service_name", vec![log.service_name])
                    .column("message", vec![log.message])
                    .column("environment", vec![log.environment])
                    .column("trace_id", vec![log.trace_id])
                    .column("span_id", vec![log.span_id])
                    .column("attributes", vec![s])
            })?;

        client
            .insert("logs", block)
            .await
            .map_err(|e| format!("{e:?}"))
    }
}

La versión con Pin es igual que esta última, simplemente envolviendo todo el futuro (creado por el bloque async move) en Box::pin.

  pub fn insert_log(
    pool: Pool,
    log: ClotrRequest,
) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
    Box::pin(...)
}

Diferencias entre versiones

Usamos la versión sin Pin. Mucho más clara. En principio más rápida porque no tiene porqué ir a una vtable para ver qué implementación explícita llamar, pero en estos casos de creación de clases/instancias anónimas/opacas, no sé muy bien si hay mucha diferencia, ni tampoco qué ocurre exactamente. Realmente solo hay una única implementación de dyn Future..., por lo que supongo que el compilador sabe optimizar la llamada.

Como ventaja de la versión con Box::Pin, si necesitamos crear un array de futuros a los que llamar más adelante, la versión que devuelve un tipo opaco/sin especificar no sirve, porque el compilador no sabe qué tamaño tendrá cada elemento del vector, mientras que con Box sabe que cada uno tiene el tamaño de un puntero a memoria.

Enlaces


  1. Esto está muy bien explicado en el capítulo 8 de Rust for Rustaceans↩︎

  2. De hecho no es necesario, y la implementación que uso es la que devuelve futuro sin envolver. ↩︎

  3. Al final no uso el trait donde muestro la evolución de las firmas, pero eso es otra historia, y las funciones elegidas son equivalentes a los métodos que usaban anteriormente. ↩︎