深圳网站建设注意事项,wordpress系统如何用,南京房产网,免费手机端网站模板最近发了个宏愿想写一个做企业金融研究的Python框架。拖出Python一看已经更新到了3.8#xff0c;于是就发现了Python 3.8里新出现的模块#xff1a;multiprocessing.shared_memory。随手写了个测试。生成一个240MB大小的pandas.DataFrame#xff0c;然后转换成numpy.recarra…最近发了个宏愿想写一个做企业金融研究的Python框架。拖出Python一看已经更新到了3.8于是就发现了Python 3.8里新出现的模块multiprocessing.shared_memory。随手写了个测试。生成一个240MB大小的pandas.DataFrame然后转换成numpy.recarray。这个DataFarme里包括了datetime整型和字符串类型的列。使用numpy.recarray的目的是为了保存dtype这样才能在子进程中正确从共享内存里读数据。 我在子进程中简单地使用numpy.nansum来做计算。第一种方法是使用共享内存第二种方法是直接将numpy.recarray作为参数传递给子进程。 下图为测试代码的输出。可以看出使用共享内存的第一种方法只使用了可以忽略不计的内存并且2秒结束战斗。传参数的方法使用了1.8GB的内存并且慢得要命花费200多秒。当然这跟我使用的测试机是一台2017年的12寸MacBook 4-core i5 8G RAM已停产有可能不过侧面也说明在数据足够大的时候尽量避免没必要的复制和传递还是很有效的。测试代码如下from multiprocessing.shared_memory import SharedMemory
from multiprocessing.managers import SharedMemoryManager
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import current_process, cpu_count
from datetime import datetime
import numpy as np
import pandas as pd
import tracemalloc
import timedef work_with_shared_memory(shm_name, shape, dtype):print(fWith SharedMemory: {current_process()})# Locate the shared memory by its nameshm SharedMemory(shm_name)# Create the np.recarray from the buffer of the shared memorynp_array np.recarray(shapeshape, dtypedtype, bufshm.buf)return np.nansum(np_array.val)def work_no_shared_memory(np_array: np.recarray):print(fNo SharedMemory: {current_process()})# Without shared memory, the np_array is copied into the child processreturn np.nansum(np_array.val)if __name__ __main__:# Make a large data frame with date, float and character columnsa [(datetime.today(), 1, string),(datetime.today(), np.nan, abc),] * 5000000df pd.DataFrame(a, columns[date, val, character_col])# Convert into numpy recarray to preserve the dtypesnp_array df.to_records(indexFalse)del dfshape, dtype np_array.shape, np_array.dtypeprint(fnp_arrays size{np_array.nbytes/1e6}MB)# With shared memory# Start tracking memory usagetracemalloc.start()start_time time.time()with SharedMemoryManager() as smm:# Create a shared memory of size np_arry.nbytesshm smm.SharedMemory(np_array.nbytes)# Create a np.recarray using the buffer of shmshm_np_array np.recarray(shapeshape, dtypedtype, bufshm.buf)# Copy the data into the shared memorynp.copyto(shm_np_array, np_array)# Spawn some processes to do some workwith ProcessPoolExecutor(cpu_count()) as exe:fs [exe.submit(work_with_shared_memory, shm.name, shape, dtype)for _ in range(cpu_count())]for _ in as_completed(fs):pass# Check memory usagecurrent, peak tracemalloc.get_traced_memory()print(fCurrent memory usage {current/1e6}MB; Peak: {peak/1e6}MB)print(fTime elapsed: {time.time()-start_time:.2f}s)tracemalloc.stop()# Without shared memorytracemalloc.start()start_time time.time()with ProcessPoolExecutor(cpu_count()) as exe:fs [exe.submit(work_no_shared_memory, np_array)for _ in range(cpu_count())]for _ in as_completed(fs):pass# Check memory usagecurrent, peak tracemalloc.get_traced_memory()print(fCurrent memory usage {current/1e6}MB; Peak: {peak/1e6}MB)print(fTime elapsed: {time.time()-start_time:.2f}s)tracemalloc.stop()值得一提的是numpy.ndarray的dtype一定不能是object不然子进程访问共享内存的时候一定segfault但如果在主进程里访问共享内存就没事。 补充更新一下上面的测试代码work_with_shared_memory 函数里不能解引用np_array比如print(np_array)不然会segfault。使用np_array.val和np_array.date则没有问题则是因为这两个column的dtype不是object。而np_array.character_col的dtype在这个代码里是object。 解决这个问题的办法也很简单踩坑无数次后在to_records()里指定dtype。np_array df.to_records(indexFalsecolumn_dtypes{character_col: S6})这里我们指定character_col为长度为6的字符串。 如果是unicode的话可以将S6换成U6。 超出指定长度的字符串则会被truncate。这样就不会有segfault了。重点就是不能有object的dtype。