Encontrar rangos con Numpy

Cuando trabajamos con datos, en muchas ocasiones tenemos que localizar rangos de valores que cumplan ciertas condiciones, normalmente sobrepasar un cierto umbral.

Si nos ponemos a teclear sin pensar usaremos un bucle. Esta opción puede ser la óptima en muchos lenguajes pero, si usamos python, probablmente sea más eficiente usar numpy.

Buscando rangos con numpy

Para encontrar los rangos por los que nos preguntábamos inicialmente primero tenemos que filtrar:

import numpy as np

data = np.cos(np.linspace(0, 10*np.pi, 1000))
threshold = 0
idxs = data > threshold

Con la línea data > threshold obtenemos un vector lógico, el cual nos indica si cada valor sobrepasa (True) o no (False) el umbral. Para entender cómo funciona esta operación hay que tener claro el concepto de broadcasting. Esta técnica se usa mucho en numpy y otros paquetes derivados, por lo que es interesante conocerla. Aunque en este ejemplo es evidente qué es lo ocurre tras bambalinas, en otros casos su aplicación es de todo menos trivial.

Una vez tenemos qué índices están por encima y cuáles por debajo (o son iguales) de nuestro umbral, vamos a intentar conocer en qué puntos pasamos de un lado a otro, esto es, qué puntos nos delimitan los distintos rangos en que podemos nuestra serie de datos original.

ls = np.nonzero(idxs[:-1] != idxs[1:])

También podemos usar np.where(...).

In [24]: %timeit np.nonzero(idxs[:-1] != idxs[1:])
5.98 µs ± 19.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [25]: %timeit np.where(idxs[:-1] != idxs[1:])
5.45 µs ± 84.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

Ahora ya tenemos los índices del vector con los datos que estamos estudiando en los que se producen los cruces del valor límite.

In [22]: ls = np.nonzero(idxs[:-1] != idxs[1:])
Out[22]: (array([ 49, 149, 249, 349, 449, 549, 649, 749, 849, 949]),)

In [23]: data[149]
Out[23]: -0.026727084775504988

In [24]: data[150]
Out[24]: 0.004717088593031313

In [26]: data[49]
Out[26]: 0.02987056142625339

In [27]: data[50]
Out[27]: -0.001572368047584414

Es importante darse cuenta que, obviamente, y como podemos ver en las salidas del ejemplo anterior, detectamos tanto los índices en que pasamos el umbral tanto de abajo a arriba como de arriba hacia abajo. También es importante entender cómo hemos logrado detectar dichos índices. Veamos como funciona el código hasta ahora con un ejemplo manejable.

data = [2, 3, 2, 4, 5, 4, 3, 5, 2, 3]

operación
---------
idxs = data > 3

output:
-------
[ 2 > 3      [  False
  3 > 3         False
  2 > 3         False
  4 > 3         True
  5 > 3   =>    True
  4 > 3         True
  3 > 3         False
  5 > 3         True
  2 > 3         False
  3 > 3 ]       False ]

operación
---------
ENCUENTRA ÍNDICES
DONDE             idxs[todos los valores salvo el último]
SON DISTINTOS DE  idxs[todos los valores salvo el primero]

output:
-------
                [ -----
0. [  False       False
1.    False       False
2.    False       True
3.    True        True     Where
4.    True   !=   True    ======> (2, 5, 6, 7)
5.    True        False
6.    False       True
7.    True        False
8.    False       False ]
      ----- ]

Ahora que tenemos los índices, para obtener los rangos como parejas de valores podemos o iterar sobre la colección ls=o usar la función =zip.

In [39]: ls = ls[0]

In [41]: zip(ls[:-1], ls[1:])
Out[41]: <zip at 0x129d2c9c8>

In [42]: list(zip(ls[:-1], ls[1:]))
Out[42]:
[(49, 149),
 (149, 249),
 (249, 349),
 (349, 449),
 (449, 549),
 (549, 649),
 (649, 749),
 (749, 849),
 (849, 949)]

Hay que ser consciente que los rangos están definidos de forma que el primer valor es el índice previo al inicio del rango que cumple nuestra condición, mientras que el segundo es el último valor dentro del rango. En función de esto, tendremos que incrementar o el primer o el segundo valor del par, según si queremos que el rango defina solamente los valores que cumplen la condición, o bien los valores externos al rango de valores que cumplen dicha condición. Personalmente siempre trabajo con el primer caso.

Ahora ya podemos iterar sobre los pares anteriores y modificar la función original como necesitemos.

rs = [p for p in zip(ls[:-1]+1, ls[1:]+1) if data[p[0]] > 0]
cp = data.copy()
for pair in rs:
    np.put(cp, np.arange(*pair), 0)

En la primera línea filtramos aquellos pares que nos interesan, en nuestro caso los que delimitan un rango de valores superiores a un cierto umbral. Puede verse que en el segundo argumento de zip sumamos uno. Esto se debe a que la función np.arange es exclusiva por la derecha, por lo que he de incrementar el rango para que modifique el último valor del mismo.

Después, copiamos los valores a un vector auxiliar para poder usar la función put sin destruir nuestros valores de origen. Esta función toma el array a modificar, con los índices que delimitan el rango de valores a sustituir, y otro array de la misma longitud que el anterior con los valores que van a sustituir a los originales. Si pasamos un escalar, otra vez Numpy usará el broadcasting para rellenar todos los índices con el mismo valor.

En vez de un escalar, podemos pasar una función la cual, a partir de los datos originales y los índices a modificar, nos devuelva un valor dependiente de los mismos.

Puesto todo junto nos queda un código bastante compacto:

def range_values_substitution(data, threshold, f=None, limit=0):
     # Podría no hacerse explícito, mejor así por claridad y
     # por evitar casos extremos.
     if f is None:
        f = threshold
    idxs = data > threshold
    ls = np.nonzero(idxs[:-1] != idxs[1:])[0]
    rs = [p for p in zip(ls[:-1]+1, ls[1:]+1) if data[p[0]] > threshold and p[1] - p[0] > limit]
    cp = data.copy()

    for pair in rs:
        np.put(cp, np.arange(*pair), f)

    return cp

Hemos modificado el filtrado que nos extrae los rangos que deseamos de forma que, si el rango no tiene un ancho mínimo (p[1] - p[0] > limit), se descarte1.

La función anterior tiene un problema: descarta el primer y el último rango. Para evitarlo, podemos añadir un primer valor de signo contrario al primero valor real en la primera posición del array, y un último también de signo contrario al último valor, y procesar este nuevo vector.

def range_values_substitution(data, threshold, f=None, limit=0):
    if f is None:
        f = threshold
    aux = np.insert(data, [0, len(data)], [-data[0], -data[-1]])
    idxs = aux > threshold
    ls = np.nonzero(idxs[:-1] != idxs[1:])[0]
    rs = [p for p in zip(ls[:-1]+1, ls[1:]+1) if aux[p[0]] > threshold and p[1] - p[0] > limit]
    cp = aux.copy()

    for pair in rs:
        np.put(cp, np.arange(*pair), f)

    return cp[1:-1]

Añadir valores a arrays de Numpy no es una operación muy eficiente. De todas formas, y al ser una operación puntual, salvo que vayamos muy justos de memoria, no nos perjudica en exceso, aunque hay que ser conscientes de que se empeora de forma sustancial el rendimiento. Podemos verlo en el siguiente ejemplo, donde aparecen los tiempos de ejecución si procesamos un vector con un millón de elementos:

# Versión original sin valores añadidos
11.2 ms ± 476 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

# Versión con valores añadidos al inicio y al final
16.5 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

Aunque estamos de enhorabuena, porque nos podemos evitar tener que modificarlo: nuestra función nos da todos los rangos por encima y por debajo de un cierto umbral. Por lo tanto, nos segmenta todo el vector en secciones superiores e inferiores a dicho valor. Sabiendo esto, podemos simplemente añadir al vector que define los límites de los rangos los índices extremos para que se nos puedan crear los rangos iniciales y finales. Con esto, y haciendo la suma fuera del zip, llegamos a una versión ligeramente distinta de la última que presentamos:

def range_values_substitution(data, threshold, f=None, limit=0):
    if f is None:
        f = threshold

    idxs = data > threshold
    ls = list(np.nonzero(idxs[:-1] != idxs[1:])[0] + 1)
    ls = np.array([0] + ls + [data.size-1])
    rs = [p for p in zip(ls[:-1], ls[1:]) if data[p[0]] > threshold and p[1] - p[0] > limit]
    cp = data.copy()

    for pair in rs:
        np.put(cp, np.arange(*pair), f)

    return cp[1:-1]

con un tiempo de ejecución bastante mejorado:

12.5 ms ± 294 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Puede parecer extraño hacer:

ls = list(np.nonzero(idxs[:-1] != idxs[1:])[0] + 1)
ls = np.array([0] + ls + [data.size-1])

Pero justo por lo dicho anteriormente, es mejor pasar el array a una lista, concatenarla (O(1)), y volver a crear un np.ndarray. Seguimos creando uno, pero con insert se creaban dos arrays temporales, lo que explica la diferencia en el tiempo de ejecución2. Las pruebas de rendimiento así lo demuestran.

Otras opciones

Si como variante queremos obtener un único rango entre el máximo y el mínimo valor, la función np.ptp es justo lo que buscamos. Por otra parte np.clip nos pone los valores por encima y debajo de un mínimo y un máximo justo en dichos valores, suficiente para el ejemplo que hemos presentado. Pero para sustituir por valores calculados de forma más compleja, o con el condicionante de rangos de un determinado tamaño, se nos queda corta.

Justo lo que hemos presentado se hace, de una forma en mi opinión un poco críptica, en este hilo de StackOverflow. Haciendo un par de pequeños ajustes a lo que allí tenemos nos queda:

def range_values_substitution_so(data, threshold, f=None, limit=0, st_inc=False):
    if f is None:
        f = threshold

    def islandinfo(y, trigger_val, stopind_inclusive=True):
        y_ext = np.r_[False, y>trigger_val, False]
        idx = np.flatnonzero(y_ext[:-1] != y_ext[1:])
        lens = idx[1::2] - idx[:-1:2]

        return zip(idx[:-1:2], idx[1::2]-int(stopind_inclusive)), lens

    cp = data.copy()
    rs = [p for p in islandinfo(data, threshold, st_inc)[0] if p[1] - p[0] > limit]

    for pair in rs:
         np.put(cp, np.arange(*pair), f)

    return cp

Esta forma sí detecta sin necesidad de modificar el array los rangos ubicados en los extremos, aunque no mejora el rendimiento de nuestra última versión.

12.9 ms ± 689 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Por último ya, siempre que no necesitemos mantener los datos originales, si el put lo hacemos sobre data nos ahorraremos la copia, con lo que conseguiremos reducir aún más el tiempo de ejecución.

Conclusión

La forma que hemos presentado, aunque ha necesitado algunos ajustes, es bastante robusta3, permite obtener y modificar los rangos que deseemos en arrays de millones de elementos de forma rápida y, sobretodo, es bastante clara y legible.


  1. Aunque pueda parecer una funcionalidad inútil, en ocasiones en el trabajo la hemos gastado para poder descartar solo rangos más anchos que un cierto rango de tiempo. Superar dicho rango de tiempo nos indicaba que los valores en ese se estaban generados por razones distintas a aquellas que queríamos estudiar. Al descartar estos rangos y sustituir los valores originales por otros no anómalos, en nuestro caso conseguíamos acelerar el procesamiento posterior. ↩︎

  2. No soy tampoco capaz de asegurar que esta sea la razón al 100%. ↩︎

  3. Al menos en los casos en los que se ha usado hasta ahora. ↩︎