
Imagen por autor
Para el procesamiento paralelo, dividimos nuestra tarea en subpartes. Aumenta el número de trabajos procesados por el programa y reduce el tiempo total de procesamiento.
Por ejemplo, si está trabajando con un archivo CSV grande y desea modificar una sola columna. Introduciremos los datos como una matriz a la función y procesará múltiples valores en paralelo según el número disponible. trabajadores. Estos trabajadores se basan en la cantidad de núcleos en su CPU.
Nota: El uso de procesamiento paralelo en una base de datos más pequeña no mejorará el tiempo de procesamiento.
En este blog aprenderemos cómo reducir el tiempo de procesamiento de archivos grandes usando multiprocesamiento, lugar de trabajoy: gracias Paquetes de Python. Es un tutorial simple que se puede aplicar a cualquier archivo, base de datos, imagen, video y audio.
Nota: Usamos el cuaderno Kaggle para experimentos. El tiempo de procesamiento puede variar de una máquina a otra.
Usaremos la base de datos de accidentes de EE. UU. (2016 – 2021) de Kaggle, que consta de 2,8 millones de registros y 47 columnas.
vamos a importar multiprocessing
, joblib
y: tqdm
para procesamiento en paralelo, pandas
para capturas de datosy: re
, nltk
y: string
para procesamiento de texto.
# Parallel Computing import multiprocessing as mp from joblib import Parallel, delayed from tqdm.notebook import tqdm # Data Ingestion import pandas as pd # Text Processing import re from nltk.corpus import stopwords import string
Antes de saltar directamente, sentémonos n_workers
duplicando cpu_count()
. Como puede ver, tenemos 8 empleados.
n_workers = 2 * mp.cpu_count() print(f"{n_workers} workers are available") >>> 8 workers are available
En el siguiente paso, incorporaremos archivos CSV de gran tamaño utilizando pandas read_csv
función. Luego imprima la forma del marco de datos, el nombre de la columna y el tiempo de procesamiento.
Nota: La función mágica de Júpiter
%%time
puede mostrar tiempos de CPU y: tiempo de pared al final del proceso.
%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")
Resultado:
Shape:(2845342, 47) Column Names: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street', 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype="object") CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s Wall time: 46.9 s
Él clean_text
es una función simple de procesamiento y limpieza de texto. conseguiremos ingles se detiene usando nltk.copus
utilícelo para filtrar palabras de punto de una cadena de texto. Después de eso, eliminaremos los caracteres especiales y los espacios adicionales de la oración. Esa será la función de línea de base que debería determinar el tiempo de procesamiento. serie, paraleloy: lote Procesando.
def clean_text(text): # Remove stop words stops = stopwords.words("english") text = " ".join([word for word in text.split() if word not in stops]) # Remove Special Characters text = text.translate(str.maketrans('', '', string.punctuation)) # removing the extra spaces text = re.sub(' +',' ', text) return text
Para la serialización podemos usar pandas. .apply()
función, pero si desea ver la barra de progreso, debe activarla gracias para pandas y luego usarlo .progress_apply()
función.
Vamos a procesar los 2,8 millones de registros y almacenar el resultado en la columna Descripción.
%%time tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)
Resultado:
Tardó 9 minutos y 5 segundos. clase alta Procesamiento de CPU a serie 2,8 millones de líneas.
100% 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s Wall time: 9min 5s
Hay diferentes formas de paralelizar un archivo y vamos a conocerlas todas. Él multiprocessing
es un paquete integrado de python comúnmente utilizado para el procesamiento paralelo de archivos grandes.
Crearemos multiprocesamiento Piscina con 8 empleados y use mapa Función para iniciar el proceso. Para mostrar las barras de progreso, usamos gracias.
La función map consta de dos secciones. El primero requiere una función y el segundo requiere un argumento o una lista de argumentos.
Obtenga más información leyendo la documentación.
%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))
Resultado:
Casi hemos mejorado nuestro tiempo de procesamiento 3x. Se reduce el tiempo de procesamiento 9 minutos 5 segundos a: 3 minutos 51 segundos.
100% 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s Wall time: 3min 51s
Ahora aprenderemos sobre otro paquete de Python para realizar procesamiento en paralelo. En esta sección usaremos joblib Paralelo y: tarde copiar mapa función.
- Parallel requiere dos argumentos: n_jobs = 8 y backend = multiprocesamiento.
- Luego agregaremos Borrar texto: a: tarde función.
- Cree un bucle para alimentar un valor a la vez.
El proceso a continuación es bastante general y puede modificar su función y matriz según sus necesidades. Lo he usado para procesar miles de archivos de audio y video sin ningún problema.
Es recomendado. agregar manejo de excepciones usando try:
y: except:
def text_parallel_clean(array): result = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (text) for text in tqdm(array) ) return result
Agregar una columna “Descripción” text_parallel_clean()
.
%%time df['Description'] = text_parallel_clean(df['Description'])
Resultado:
Nuestra función tardó 13 segundos más que el multiprocesamiento Piscina. Incluso entonces, Paralelo 4 minutos 59 segundos más rápido que serie Procesando.
100% 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s Wall time: 4min 4s
Hay una mejor manera de procesar archivos grandes dividiéndolos en lotes y procesándolos en paralelo. Comencemos por crear una función por lotes que ejecutará un clean_function
en un conjunto de valores.
Función de procesamiento por lotes
def proc_batch(batch): return [ clean_text(text) for text in batch ]
Dividir un archivo en lotes
La siguiente función dividirá el archivo en múltiples lotes según la cantidad de trabajadores. En nuestro caso, obtenemos 8 lotes.
def batch_file(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batches = [ array[ix:ix+batch_size] for ix in tqdm(range(0, file_len, batch_size)) ] return batches batches = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]
Procesamiento por lotes en paralelo
Finalmente, utilizaremos Paralelo y: tarde para procesar lotes.
Nota: Para obtener una sola matriz de valores, necesitamos ejecutar la lista de comprensión como se muestra a continuación.
%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (batch) for batch in tqdm(batches) ) df['Description'] = [j for i in batch_output for j in i]