At Sipfront, we offer automated VoIP tests for telecom systems. Besides the System Under Test, the internet has the biggest impact on a test. Internet outages or re-routing due to e.g. natural disasters can result in lost or delayed network packages. We combine real-time information about the internet with cutting-edge machine learning methods to estimate its influence a the results of a test. This helps us to deliver focused insights and recommandations to our customers.
This post is about the data acquisition of routing update frequencies using the RIPEstat-API. A high number of routing updates might indicate an underlying problem or instability within the network. We present approaches how to query, process and store data from the API while keeping memory low and performance high.
We first introduce our specific problem and show increasingly complex approaches to solve it.
Problem
In essence, we want to extract the number of updates a node on the internet performs. We can query our API for each node, receiving a JSON containing a list of updates with timestamps. In simplified form, the response looks like the following:
{
'node_name': 'Example Node',
'updates': [
{'timestamp': 1662980995},
{'timestamp': 1662980996},
{'timestamp': 1662980996},
{'timestamp': 1662980996},
{'timestamp': 1662980998},
...
{'timestamp': 1662999112},
{'timestamp': 1662999112},
{'timestamp': 1662999113},
{'timestamp': 1662999113},
{'timestamp': 1662999113}
]
}
Responses vary in length depending on the number of updates. Long responses can include over a hundred thousand updates. As we query the updates of approximately 80 thousand nodes, this is not a trivial problem. Especially as we work in an environment with only a few GBs of RAM.
Simple Approach
The most straightforward approach consists of iterating over all nodes and aggregating the results.
for node in nodes:
timestamps = get_node_timestamps(node)
store_timestamps(node, timestamps)
For our case, each request has a mean execution time of about 4 seconds, which would take 4 sec * 80.000 = 88.89 hours. As we need daily updates, this is too slow.
Medium Approach
Python offers the concurrent
library, which could solve the time issue. The ThreadPoolExecutor class enables us to asynchronously execute API queries.
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
timestamps_concurrent = executor.map(get_node_timestamps, nodes)
store_timestamps_concurrent(nodes, timestamps_concurrent)
With 16 threads running, we could potentially reduce the time to 88.89 : 16 = 5.5 hours! Perfectly fine for querying daily updates.
However, initial experiments show that even though this approach is fast, it is memory intensive. If several threads work on long responses, out-of-memory errors occur. A major contribution to memory peaks comes from converting the API response to JSON (more about that here).
response = requests.get(node_api_url)
response_json = json.loads(response)
# iterate over response_json to extract timestamps
Instead of converting the response, we utilize the structure of our data and directly extract the timestamps from the raw response with a basic regex. Regexes should be used with caution, as problems could arise if the response format changes or if other parts of the response also include a timestamp.
response = requests.get(node_api_url)
timestamps = re.findall(r'"timestamp": (\d+),', response.decode("utf-8"))
This approach is much more memory efficient and also faster.
Advanced Approach
After a few days of happily querying the API at lightning speed, the script again starts throwing memory errors. Closer inspection shows that sometimes a response is enormously long, needing GBs of RAM.
None of our previous approaches solved the underlying problem that the size of a single response could be already too large for our memory.
Our final solution is to use the streaming option of the Python requests
library.
response = requests.get(node_api_url, stream=True)
timestamps = []
for chunk in response.iter_content(chunk_size=2**15):
timestamps += re.findall(r'"timestamp": (\d+),', chunk.decode("utf-8"))
Now the memory usage of a single thread is bounded, regardless of the size of the response. Streaming performs slightly slower, but the memory reduction makes it more than worth it.
It has to be noted that due to chunking some timestamps might be lost if two chunks are cut within a timestamp entry. In our case, we can afford to lose a small number of timestamps, however, other applications might not be as forgiving.
Data storage
The intermediary results of the concurrent threads are stored in RAM before they are saved to disk. Over time, this can create a large memory overhead. A straightforward approach to keeping memory consistently small is to use batch processing. We split our nodes into batches of 128 nodes and process each batch separately.
nodes_batches = create_batches(nodes, num_per_batch=128)
for nodes_batch in nodes_batches:
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
timestamps_concurrent = executor.map(get_node_timestamps, nodes_batch)
store_timestamps_concurrent(nodes_batch, timestamps_concurrent)
In practice, this solution can still be prone to memory issues, as the memory used by the threads might not be freed instantaneously (more about that here). To be on the safe side, we create a separate process for each batch of nodes using the multiprocessing
library. This ensures that all resources, including the threads, are freed when the process has ended.
def process_nodes_batch(nodes_batch):
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
timestamps_concurrent = executor.map(get_node_timestamps, nodes_batch)
store_timestamps_concurrent(nodes_batch, timestamps_concurrent)
nodes_batches = create_batches(nodes, num_per_batch=128)
with multiprocessing.Pool(processes=1) as p:
_ = p.map(process_nodes_batch, nodes_batches)
Splitting into batches and creating a separate process yields a slightly longer run time, but the reduction in memory makes it worthwhile.
Conclusion
In this post, we presented several strategies to perform API requests faster and with less memory and computational overhead:
- Concurrent requests
- Regex instead of JSON
- Streaming the response
- Batch execution in separate process
I hope this helps you in your API querying or web scraping endeavors.
comments powered by Disqus