When saving data from a QUA program, it is first saved on the OPX on-board memory. From there, it's transferred to the server and eventually to the user PC. Stream processing provides the capability to process data as it is being transferred to the QM server. This both reduces the amount of data that has to be saved on the user PC, as well as the post-processing computation time.
Consider, as a simple example, a case where we would like to characterize the success rate of preparing a qubit in the \(|1 \rangle\) state. We want to play the pulse, read-out the qubit's state and decide whether it is a \(|1\rangle\) or a \(|0\rangle\). We then repeat this a large number of times, say \(1 \times 10^5\). Collecting the results of all \(10^5\) experiments is not so interesting. Instead, we can only collect the final average of the set of experiments or how this average develops as data is accumulated.
A running average is a straightforward example, but many other manipulations are possible. The "stream processing" allows for arithmetic operations and data reshaping to occur on server, in parallel to OPX experimental runs.
Basic Syntax and Examples¶
To use the server processing feature, a stream must be defined. A stream consists of a stream variable, a pipeline and a terminal. The processed stream items can be accessed on the client PC, where they are referred to as results, by creating result handles on the client PC. In what follows, we introduce each of these components and explain how they are used.
To initiate a stream, we use
declare_stream() to declare a stream variable, using the following syntax inside a QUA program:
To pass a variable to a stream, we use the
save() statement or the
This creates a data transfer path through which stream items are processed and then saved to either
a permanent or overriding storage (see Glossary for more details).
To illustrate how pipelines and terminals are created and used, we show below a full QUA program which saves data to a stream, manipulates it and stores the results to terminals. In addition, you can take a look at a few examples in our GitHub library
with program() as prog: my_stream1 = declare_stream(adc_trace=True) my_stream2 = declare_stream() a = declare(fixed) assign(a, 0.3) save(a, my_stream2) measure('my_pulse', 'qe', my_stream1) with stream_processing(): my_stream1.input1().with_timestamps().save('adc_results') my_stream2.save_all('a_results')
Here two streams are created. The first,
my_stream1, is used to stream raw ADC samples.
my_stream2, is used to stream the value of the QUA variable
The pipelines and terminals are defined under the
with stream_processing() context. Pipelines initiate with a
stream variable and terminate with a
function, which acts as a terminal. In this example the pipelines are
simple: The first selects only the data from analog input1 and adds timestamps to it. The second is immediately terminated.
The terminal used for the pipeline initiated with
my_stream1 is a save terminal.
This is a memory-less terminal that holds only the last value transferred - this can result in data loss if data
is not fetched quickly enough from the client PC. The terminal is given the tag
'adc_results', which can later
referenced using a result handle.
The terminal for
my_stream2 is a
save_all terminal which does store all the results (up to a memory
limitation, see Server PC storage and data limitations). The tag for this terminal is
To access the results on the client PC side, we create a Result handle. This allows us to retrieve ("fetch") the stream items on the client PC side, or to perform other query or storage operations on it. To continue our example:
This example collects all results which were contained in the stream using the
Alternatively, we can only get the most recent result by calling the
Note that both these methods are called on a result handle. This structure and its usage are described below.
It is still possible to use the syntax from previous versions for creating a stream that is only terminated
save_all() ([legacy save][../#legacy-save]). The following examples are equivalent:
Saving values to the same stream from different OPXs is not supported.
Streaming Raw ADC Results¶
The following is a simple example of a program that acquires a single raw ADC trace:
adc_trace=True we specify that data should be grouped into individual ADC
traces and not passed on a sample-by-sample manner, such that each trace will be of size
Next, to populate the stream with results, we specify the relevant stream in the
measure statement. Finally, in the
stream_processing context, the pipeline specifies that we acquire data from analog input 1, and save it with the tag
which we can later refer to in the client PC.
adc_trace=True is equivalent to writing
stream.buffer(pulse_len + 2*smearing).
See details on buffer below and here on smearing
To record a raw ADC stream, it is required to play a digital marker that is associated with the measurement element. Only samples that arrive while the digital trigger is on will be recorded in the stream pipeline. This means it is possible to "gate" the raw ADC stream by using different
sequences of digital waveforms in the readout operation used for the
The digital waveform only affects the raw ADC streams. It does not change the data used when processing the measurement using integration, demodulation etc.
The stream items stored in the save/save all terminals on the server PC can be fetched to the client PC using the
results_handles property of the
The fields of the
result_handles property contain references to the stream terminals, and their names are the
same as the tags given to the terminals.
A handle to a specific terminal with tag
"my_result", for example, is accessed with
my_result = job.results_handles.get("my_result").
A shorthand notation for this is
my_result = job.results_handles.my_result.
We can query both the state of a single result handle or that of the collection of all results handles.
For example, one can query the processing state using the
wait_for_all_values() method to suspend python execution until either a timeout has occurred or
saving has completed.
In case the handle references results from a save terminal, we can also wait for a specific number of results
to arrive using the
The number of stream items referenced by a specific result handle can be obtained by calling
on that handle, or equivalently by calling
Saving results in numpy format to a local variable is done using the
methods as specified below.
We can also save results to the file system by calling
fetch and fetch_all¶
To transfer the results from the server PC to the client PC, [
fetch_all() commands are called on a result
handle, for example:
my_stream is the tag result tag given to the save/save_all terminal.
fetch collects the most recent result in the stream and
fetch_all collects all saved data.
In the case a save terminal is used, fetch all is equivalent to fetch.
It is also possible to collect only a portion of the collected data by passing
fetch a slice object.
For example, say your result stream contains [0,1,2,3,4,5,6,7,8,9].
fetch(0) returns 0,
fetch(4) returns 4 and
fetch(slice(2,5) returns [2,3,4,5]. Any valid python slice object can be used.
When fetching the data while using
resume() with a
save() command there is no guarantee that the data you fetch at a given moment is the data point from the current iteration. There can be some delay from the moment the data is acquired until it is available for fetching.
Moreover, the streams are a-synchronous. For example, the availability of the result I[j] might be a few moments before Q[j]. If you fetch the data before the data is available at the Q stream, you will end up with a mixed data point I[j],Q[j-1].
That means that it is better to use the
save_all() command and fetch the desired data.
Using Stream Operators¶
Data Restructuring with buffer()¶
The buffer method allows reshaping of the incoming stream items using the
Will result in a stream of number pairs. And
Will result in a stream of 2 by 2 matrices. This is useful inside a doubly nested for loop as we can loop over a pair of independent variables and save the resulting dependent variable with the correct structure. We can then perform additional processing. For example, we can perform a running average over the resulting matrix in the following way:
This allows us to collect the resulting matrix and observe how the average evolved with the subsequent runs in real-time.
It is not possible to process more than 100e6 buffered variables at the same time. This is true for total number of variable streamed at the same time.
For example in the following snippet of code, the first line can be processed by itself, adding the second line will drive the system behind its limitations:
streaming a buffered variable with time stamps is akin to streaming double the amount of variable. When using
with_timestamps(), each variable should be calculated as two variables when considering this limitation.
Combining streams using the
Streams can be zipped together into tuples of results using the
zip() stream operator.
This is similar to creating a buffer but combines data from two separate streams rather than reshaping a single one.
In this case, the zipped resulting stream will have a shape dictated by the number of zipped tuples.
Each element of this tuple is a named field
i is the named field number (the index of the zipped stream).
zipped has a shaped based on the number of items
n. It has two named fields:
Mapping operations over streams with the
map function allows you to perform an operation on each element in a stream. It becomes especially useful when combined with the
buffer function because then operations can be
performed on each filled buffer. For example, one can generate a buffer of size 3, and average over the samples in each triplet of numbers in the buffer. This is done as follows:
There are multiple FUNCTIONS that can be used in this way. For example,
FUNCTIONS.multiply_by(scalar_or_vector). Both these have
shortcut operators, so you can simply use
.multiply_by as an operator in a stream pipeline. However, not all function have a shortcut, and it's useful to learn what the options are.
Some of these functions are:
convolution,`tuple_multiply` but there are others. You can use Python's introspection to see what is available.
Average on axis¶
When working with two-dimensional buffers, one can specify over which axis to average. For example, in the following three-dimensional buffer, averaging is done on the first (zero-eth) axis with the following syntax:
A list of axis can also be passed, in which case averaging is done on all passed axes. This matches behavior in numpy for example.
Flat data structure option¶
The data structure returned by calling
fetch_all is a numpy structured array. In simple cases, where buffering is not used, the shape of this array is exactly given by the number of saved items. If time stamps are also included, there is a single column of values and a second column with timestamps.
If, however the data is buffered or manipulated in some way, the number of items in the output array will be the number of filled (or partially filled) buffers. For example, consider the following buffered stream
If we had 300 items entering the steam, the shape of the output stream numpy array is: 6 (10*5*6 = 300). Each numpy array item has shape 10 X 6, but because the inner dimensions are "hidden" this is not indexable in the way you might expect.
To modify this behavior, an option to "flatten" the result array has been added. This is called by passing the following parameter to
In this case, the result shape will have the shape (6,5,10) and indexing can be performed as is usual in numpy array.
flat_struct flag is used on zipped streams, each named field will have a shape as described for a single stream.
Stream Processing Arithmetics¶
It is possible to use the server to perform arithmetic operations on a single stream or between different streams.
A complete list of the possible operations can be found in
_ResultStream(). In general, an operation
can be performed between a stream to itself, two different streams or a stream and a scalar. The operation is done
element-wise, so both streams need to be buffered identically. In the case of an operation between a stream and a scalar,
the operations must be done before buffering the stream.
It is possible to do multiplication between a buffered stream and a vector using the
The vector length must be identical to the stream buffer.
Some simple syntax examples:
(stream + 1).save("example") # Adding 1 to each element in the stream (stream_1 + stream_2).buffer(5).save("example") # Adding element-wise stream1 to stream2 and buffering the result (stream_1.buffer(5) + stream_2.buffer(5)).save("example") # First buffering the streams and then adding them to each other ((stream1+1)*3+(stream2+1)/2).buffer(5).average().save_all("example") # A combination of stream and scalar arithmetics
Notice that lines two and three in the above examples will results in identical outputs.
An operation between two streams that have different total length will only occur a number of times equal to the shortest stream
in their order within the pipeline, regardless of the location of the
save() command in the QUA program.
For example if
stream1 has n items, and
stream2 has m items, where m < n. The result
(stream1+stream2).save_all() would be an array of length m, with the values of the first m elements in
A simple example that shows the utility of stream arithmetic is the ability to calculate the variance of a measured variable during the measurement. In the figure below, we see the results of generating a random integer between 0 and 9 repeatedly, and streaming two streams as written below:
where the first line stream the running average (in blue in the figure), and the second line calculate the variance and stream the results (in orange in the figure).
Fig. 2: Running average stream (blue) and the variance stream(orange) of a randomly generated number on the OPX+. The horizontal lines mark the theoretical average (blue) and variance (orange).
In the following example, we see the use of the histogram. By using the Time Tagging, we get an array of time stamps indicating at which point in the measurement we counted a pulse (simulating a photon). By repeating the time tagging measurement, we can stream the results in a histogram telling us how many pulses we counted in each time bin.
The above code line shows how to use the histogram in the stream processing where
meas_len is the measurement length,
resolution is the required histogram bin size.
Fig. 3: A histogram of the number of counts as a function of measurement time. The time bin resolution is 20ns.
Prior to saving into explicitly declared streams, QUA's save mechanism was directly to a tag :
'tag' is a string identifier.
When using this legacy saving mechanism the output can be retrieved without change, as below:
This retrieves both values and timestamps. Alternatively, if only values (or only timestamps) are needed, we have introduced the following notation:
This provides only values (or only timestamps) and is slightly more performant than the previous option. In future versions
flat_struct=True the default option.
Server PC storage and data limitations¶
save_all terminal will store in the all of the stream items created during the execution
of a QUA program as results in the permanent memory of the server PC.
The permanent memory of the server PC is large but not unlimited, and therefore it is possible in some cases
to create more results than can be stored at the server PC.
The server PC permanent memory can store up to 100GB of result data without danger of data-loss.
If more than 100GB are stored in the permanent memory, the data may be erased after a short time to ensure the permanent storage is not overfilled. To ensure that job results are not erased, always make sure that no more than 100GB of data are generated by the QUA program without being fetched.
The results will not be immediately erased from the server once they are fetched.
To clear the permanent storage and ensure that all of the 100GB are available, use the
Data loss notification¶
When collecting results from a stream, the system can usually estimate how much data is expected to be received.
For example, if the
measure command plays a pulse with a duration of 1000 samples, we expect to collect
an output of that length. In cases where the expected amount of data does not match the received amount,
an error message will appear when the
.fetch() commands are called.
It can also be manually checked by calling
Stream processing - introduction¶
Stream processing - save and save_all¶
Stream processing - buffer¶
Stream processing - averaging¶
Stream processing - average within a buffer¶
Stream processing - buffer.average.buffer¶
Stream_processing - advance example (part 1)¶
Stream_processing - advanced example (part 2)¶
- A source (or start point) of a pipeline, receives values of QUA variables or raw input samples as input.
- An overriding named sink or (end point) of a pipeline that creates a result. Each new stream item received at this terminal, will override the previous. The result is a single item, the last one received in a stream
Using a save terminal can result in data loss. It is to be used only in cases where data loss is acceptable, such as streaming plots.
Save All Terminal¶
- A named sink or end point of a pipeline that creates a result, which stores all the stream items in the pipeline. All stream items are appended to the result, complete history of the stream is preserved
- An ordered collection of operations that is performed on an input stream to return a processed version of the stream (output stream) In the context of stream processing, it is the collection of operations that can be performed starting from a stream variable and terminating in a "save terminal" or "save all terminal" node
- An ordered set of values that are being sent from a QUA program into a pipeline. A stream is set up when a QUA program is started and is torn down when it is finished.
- A single discrete unit of data of a stream.
- Can be one of the following:
- ICP result
- Single input sample from the controller
- An input trace vector (single measure statement)
It is possible to augment the timestamp to each item using
- The last or accumulated stream items that are created by save or save_all terminals, respectively
- An object through which results can be fetched