How to create a data loader which populates a dictionary in background?

I am trying to loop over a set of graphs as shown in the snippet from the main script below:

import pickle
for timeOfDay in range(1440):  # total number of minutes in a day=1440
    with open("G_" + timeOfDay +'.pickle', 'rb') as handle:
       G = pickle.load(handle)
    ## do something with G
    

I could load all the Graphs G_1.pickle to G_1440.pickle in the RAM but it exhausts my RAM. So, I tried to create a dictionary G_RAM which has 1440 keys with G_RAM[i] = the graph loaded from G_{i}.pickle. But it runs into a whole lot of issues, sometimes getting stuck sometimes not deleting the older graphs at the right time. I would like to know if there is a standard way to do this without having to re-invent the wheel. Any help is appreciated! Thanks in advance!

Now, I have a background thread which does the following:

  1. reads the global value of timeOfDay variable
  2. uses N processes to populate the G_RAM for keys timeOfDay to timeOfDay + N and deletes all keys G_RAM[i] for i < timeOfDay

Please note that the loop above cannot be parallelized because the graph at G_i.pickle must be processed before G_{i+1}.pickle

My attempt is reproduced below:

import multiprocessing
import pickle
import gc

HORIZON_PARALLEL_LOADING_OF_GRAPHS = 10
THREADS_PARALLEL_LOADING_OF_GRAPHS = 5

G_RAM = {}
##### GLOBAL VARIABLES
# global variable to keep track of which timeOfDays are already being worked at by multiple process
# Simply checking for the keys in the G_RAM is not enough because it is possible that some process is
# working to populate it already but it is not complete, hence we keep track of which timeOfDays are already
# being worked at
already_being_worked_at = []
timeOfDay = 0




# Inspired by from https://amalgjose.com/2018/07/18/run-a-background-function-in-python/
def background(f):
    '''
    a threading decorator
    use @background above the function you want to run in the background
    '''
    def backgrnd_func(*a, **kw):
        threading.Thread(target=f, args=a, kwargs=kw).start()
    return backgrnd_func


def mp_worker(key):
    # load graph at time "key"
    with open('G' + str(key) + '.pickle', 'rb') as handle:
        read_data = pickle.load(handle)
    return (key, read_data)


def mp_handler(data):
    p = multiprocessing.Pool(THREADS_PARALLEL_LOADING_OF_GRAPHS)
    res = p.map(mp_worker, data)

    # populating the global dict here
    global G_RAM
    for k_v in res:
        G_RAM[k_v[0]] = k_v[1]
    p.close()
    p.join()




@background
def updateDict():
    # This will print the count for every second
    # G_RAM is a global variable consisting of the graphs at different times of day

    global timeOfDay

    time.sleep(0.01)
    gd_keys = list(G_RAM.keys())

    # remove the used graphs (before time timeOfDay)
    for key in gd_keys:
        if key < timeOfDay or (key > min(timeOfDay + HORIZON_PARALLEL_LOADING_OF_GRAPHS, 1440)):
            del G_RAM[key]
            gc.collect()

    # check which keys  (upto t+HORIZON_PARALLEL_LOADING_OF_GRAPHS) are missing from the G_RAM
    data = []
    for key in range(t, min(t + HORIZON_PARALLEL_LOADING_OF_GRAPHS, 288)):
        if key not in G_RAM:
            data.append(key)

    # removing already processed t to avoid creating duplicate processes
    global already_being_worked_at


    # we want to call the worker thread again only for the timeOfDays which are not being worked at by worker processes
    temp = []
    for key in data:
        if key not in already_being_worked_at:
            temp.append(key)
    data = temp

    # if the missing timeOfDays are already being worked at by the processes, we have no action item
    # otherwise, we call the worker threads with the new timeOfDays which we need but are not being
    # worked at
    if len(data) > 0:
        mp_handler(data)

    # we update our list of already_being_worked_at
    already_being_worked_at = already_being_worked_at + data


### main script follows
updateDict()

for dayNumber in range(100):
    already_being_worked_at = [] # reset the list of populated keys at the start of the new day

    for timeOfDay in range(1440):  # total number of minutes in a day=1440
        with open("G_" + timeOfDay +'.pickle', 'rb') as handle:
            G = pickle.load(handle)
            ## do something with G

Source: Python-3x Questions

LEAVE A COMMENT