Procesamiento paralelo de archivos grandes en Python


Imagen por autor

Para el procesamiento paralelo, dividimos nuestra tarea en subpartes. Aumenta el número de trabajos procesados ​​por el programa y reduce el tiempo total de procesamiento.

Por ejemplo, si está trabajando con un archivo CSV grande y desea modificar una sola columna. Introduciremos los datos como una matriz a la función y procesará múltiples valores en paralelo según el número disponible. trabajadores. Estos trabajadores se basan en la cantidad de núcleos en su CPU.

Nota: El uso de procesamiento paralelo en una base de datos más pequeña no mejorará el tiempo de procesamiento.

En este blog aprenderemos cómo reducir el tiempo de procesamiento de archivos grandes usando multiprocesamiento, lugar de trabajoy: gracias Paquetes de Python. Es un tutorial simple que se puede aplicar a cualquier archivo, base de datos, imagen, video y audio.

Nota: Usamos el cuaderno Kaggle para experimentos. El tiempo de procesamiento puede variar de una máquina a otra.

Usaremos la base de datos de accidentes de EE. UU. (2016 – 2021) de Kaggle, que consta de 2,8 millones de registros y 47 columnas.

vamos a importar multiprocessing, jobliby: tqdm para procesamiento en paralelo, pandas para capturas de datosy: re, nltky: string para procesamiento de texto.

# Parallel Computing
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm

# Data Ingestion 
import pandas as pd

# Text Processing 
import re 
from nltk.corpus import stopwords
import string

Antes de saltar directamente, sentémonos n_workers duplicando cpu_count(). Como puede ver, tenemos 8 empleados.

n_workers = 2 * mp.cpu_count()

print(f"{n_workers} workers are available")

>>> 8 workers are available

En el siguiente paso, incorporaremos archivos CSV de gran tamaño utilizando pandas read_csv función. Luego imprima la forma del marco de datos, el nombre de la columna y el tiempo de procesamiento.

Nota: La función mágica de Júpiter %%time puede mostrar tiempos de CPU y: tiempo de pared al final del proceso.

%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)

print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

Resultado:

Shape:(2845342, 47)

Column Names:

Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
'Astronomical_Twilight'],
dtype="object")

CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s

Él clean_text es una función simple de procesamiento y limpieza de texto. conseguiremos ingles se detiene usando nltk.copus utilícelo para filtrar palabras de punto de una cadena de texto. Después de eso, eliminaremos los caracteres especiales y los espacios adicionales de la oración. Esa será la función de línea de base que debería determinar el tiempo de procesamiento. serie, paraleloy: lote Procesando.

def clean_text(text): 
  # Remove stop words
  stops = stopwords.words("english")
  text = " ".join([word for word in text.split() if word not in stops])
  # Remove Special Characters
  text = text.translate(str.maketrans('', '', string.punctuation))
  # removing the extra spaces
  text = re.sub(' +',' ', text)
  return text

Para la serialización podemos usar pandas. .apply() función, pero si desea ver la barra de progreso, debe activarla gracias para pandas y luego usarlo .progress_apply() función.

Vamos a procesar los 2,8 millones de registros y almacenar el resultado en la columna Descripción.

%%time
tqdm.pandas()

df['Description'] = df['Description'].progress_apply(clean_text)

Resultado:

Tardó 9 minutos y 5 segundos. clase alta Procesamiento de CPU a serie 2,8 millones de líneas.

100%  2845342/2845342 [09:05<00:00, 5724.25it/s]

CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
Wall time: 9min 5s

Hay diferentes formas de paralelizar un archivo y vamos a conocerlas todas. Él multiprocessing es un paquete integrado de python comúnmente utilizado para el procesamiento paralelo de archivos grandes.

Crearemos multiprocesamiento Piscina con 8 empleados y use mapa Función para iniciar el proceso. Para mostrar las barras de progreso, usamos gracias.

La función map consta de dos secciones. El primero requiere una función y el segundo requiere un argumento o una lista de argumentos.

Obtenga más información leyendo la documentación.

%%time
p = mp.Pool(n_workers) 

df['Description'] = p.map(clean_text,tqdm(df['Description']))

Resultado:

Casi hemos mejorado nuestro tiempo de procesamiento 3x. Se reduce el tiempo de procesamiento 9 minutos 5 segundos a: 3 minutos 51 segundos.

100%  2845342/2845342 [02:58<00:00, 135646.12it/s]

CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s

Ahora aprenderemos sobre otro paquete de Python para realizar procesamiento en paralelo. En esta sección usaremos joblib Paralelo y: tarde copiar mapa función.

  • Parallel requiere dos argumentos: n_jobs = 8 y backend = multiprocesamiento.
  • Luego agregaremos Borrar texto: a: tarde función.
  • Cree un bucle para alimentar un valor a la vez.

El proceso a continuación es bastante general y puede modificar su función y matriz según sus necesidades. Lo he usado para procesar miles de archivos de audio y video sin ningún problema.

Es recomendado. agregar manejo de excepciones usando try: y: except:

def text_parallel_clean(array):
  result = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(clean_text)
  (text) 
  for text in tqdm(array)
  )
  return result

Agregar una columna “Descripción” text_parallel_clean().

%%time
df['Description'] = text_parallel_clean(df['Description'])

Resultado:

Nuestra función tardó 13 segundos más que el multiprocesamiento Piscina. Incluso entonces, Paralelo 4 minutos 59 segundos más rápido que serie Procesando.

100%  2845342/2845342 [04:03<00:00, 10514.98it/s]

CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s

Hay una mejor manera de procesar archivos grandes dividiéndolos en lotes y procesándolos en paralelo. Comencemos por crear una función por lotes que ejecutará un clean_function en un conjunto de valores.

Función de procesamiento por lotes

def proc_batch(batch):
  return [
  clean_text(text)
  for text in batch
  ]

Dividir un archivo en lotes

La siguiente función dividirá el archivo en múltiples lotes según la cantidad de trabajadores. En nuestro caso, obtenemos 8 lotes.

def batch_file(array,n_workers):
  file_len = len(array)
  batch_size = round(file_len / n_workers)
  batches = [
  array[ix:ix+batch_size]
  for ix in tqdm(range(0, file_len, batch_size))
  ]
  return batches

batches = batch_file(df['Description'],n_workers)

>>> 100%  8/8 [00:00<00:00, 280.01it/s]

Procesamiento por lotes en paralelo

Finalmente, utilizaremos Paralelo y: tarde para procesar lotes.

Nota: Para obtener una sola matriz de valores, necesitamos ejecutar la lista de comprensión como se muestra a continuación.

%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(proc_batch)
  (batch) 
  for batch in tqdm(batches)
  )

df['Description'] = [j for i in batch_output for j in i]

Resultado:

Hemos mejorado el tiempo de procesamiento. Esta técnica es conocida por procesar datos complejos y entrenar modelos de aprendizaje profundo.

100%  8/8 [00:00<00:00, 2.19it/s]

CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s

tqdm lleva el multiprocesamiento al siguiente nivel. Es simple y poderoso. Se lo recomendaría a todos los científicos de datos.

Consulte la documentación para obtener más información sobre el multiprocesamiento.

Él process_map requiere

  1. Nombre de la función:
  2. Una columna de marco de datos
  3. max_trabajadores:
  4. chucksize es similar al tamaño del lote. Calcularemos el tamaño del lote utilizando la cantidad de trabajadores, o puede aumentar la cantidad según su preferencia.
%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)

df["Description"] = process_map(
    clean_text, df["Description"], max_workers=n_workers, chunksize=batch
)

Resultado:

Con una línea de código obtenemos el mejor resultado.

100%  2845342/2845342 [03:48<00:00, 1426320.93it/s]

CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s

Tienes que encontrar un equilibrio y elegir la técnica que mejor se adapte a tu caso. Puede ser un procesamiento en serie, paralelo o por lotes. El procesamiento en paralelo puede ser contraproducente si está trabajando con una base de datos más pequeña y menos compleja.

En este minitutorial, aprendimos sobre varios paquetes y técnicas de Python que nos permiten paralelizar nuestras funciones de datos.

Si solo trabaja con una base de datos tabular y desea mejorar su eficiencia de procesamiento, le sugiero que pruebe Dask, DataTable y RAPIDS.

Referencia

Abid Ali Awan (@1abidaliawan:) es un científico de datos certificado al que le encanta crear modelos de aprendizaje automático. Actualmente se enfoca en la creación de contenido y escribe blogs técnicos sobre aprendizaje automático y tecnologías de ciencia de datos. Abid tiene una Maestría en Gestión de Tecnología y una Licenciatura en Ingeniería de Telecomunicaciones. Su visión es crear un producto de IA utilizando una red neuronal gráfica para estudiantes que luchan contra enfermedades mentales.



Source link