Skip to content Skip to sidebar Skip to footer

Retrieve API Data Into Dataframe Using Multi Threading Module

I'm using a third-party API to retrieve 10 minute data from a large number of days for different tags. The current data pull can take up to several minutes depending of course of t

Solution 1:

You can try the below, It will easily allow you to make a lot of requests in parallel provided the server can handle it as well;

# it's just a wrapper around concurrent.futures ThreadPoolExecutor with a nice tqdm progress bar!
from tqdm.contrib.concurrent import thread_map, process_map # for multi-threading, multi-processing respectively)

def chunk_list(lst, size):
    """
    From SO only; 
    Yield successive n-sized chunks from list.
    """
    for i in range(0, len(lst), size):
        yield lst[i:i + size]

for idx, my_chunk in enumerate(chunk_list(huge_list, size=2**12)):
    for response in thread_map(<which_func_to_call>, my_chunk, max_workers=your_cpu_cores+6)):
        # which_func_to_call -> wrap the returned response json obj in this, etc
        # do something with the response now..
        # make sure to cache the chunk results as well

Edit 1 :

from functools import partial
startdate = "*-150d"
enddate = '*'
my_new_func = partial(which_func_to_call, startdate=startdate, enddate=enddate)

And now we can use this function instead; NB -> my_new_func now accepts a single argument..

Edit 2 :

For caching, I would recommend use the csv module and write the responses you want to a csv file rather than using pandas etc; OR you can dump the JSON response etc as per your need; Sample code for a JSON/dict like response will look like,

import csv
import os

with open(OUTPUT_FILE_NAME, "a+", newline="") as csvfile:
    # fieldnames = [your_headers_list]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    # Make sure you write the header only once as we are opening the file in append mode (writer.writeheader())
    for idx, my_chunk in enumerate(chunk_list(<huge_list>, size=CHUNK_SIZE)):
            for response in thread_map(
                <my_partial_wrapped_func>, my_chunk, max_workers=min(32, os.cpu_count() + 6)
            ):
            # .......
            # .......
            writer.writerow(<row_of_the_csv_as_a_dict_with_fieldnames_as_keys>)

Solution 2:

As I understand correctly your need is to understand if getAggregatesPandas executed properly.

you can do it like below.

with concurrent.futures.ThreadPoolExecutor() as executor:
    args = ((i,startdate, enddate) for i in tags)
    results = executor.map(lambda p: gener.getAggregatesPandas(*p), args)
    for result in results:
        final_df.append(result,ignore_index=False)
    #another approach is below
    #for f in concurrent.futures.as_completed(results):
    #     final_df.append(result,ignore_index=False)

REF Video:-video


Post a Comment for "Retrieve API Data Into Dataframe Using Multi Threading Module"