Skip to content

Results API

qm.StreamsManager

StreamsManager(
    service: Union[JobResultServiceApi, JobResultApi],
    capabilities: ServerCapabilities,
    wait_until_func: Optional[
        Callable[[Literal["Done"], float], None]
    ],
)

Access to the results of a QmJob

This object is created by calling QmJob.result_handles

Assuming you have an instance of StreamsManager:

    job_results: StreamsManager
This object is iterable:

    for name, handle in job_results:
        print(name)

Can detect if a name exists:

if "somename" in job_results:
    print("somename exists!")
    handle = job_results.get("somename")

fetch_results

fetch_results(
    wait_until_done: bool = True,
    timeout: float = VERY_LONG_TIME,
    stream_names: Optional[
        Mapping[str, Union[int, slice]]
    ] = None,
    item: None = None,
) -> Mapping[str, Union[NumpyArray, Optional[NumpyNumber]]]
fetch_results(
    wait_until_done: bool = True,
    timeout: float = VERY_LONG_TIME,
    stream_names: Optional[Collection[str]] = None,
    item: Optional[Union[int, slice]] = None,
) -> Mapping[str, Union[NumpyArray, Optional[NumpyNumber]]]
fetch_results(
    wait_until_done: bool = True,
    timeout: float = VERY_LONG_TIME,
    stream_names: Optional[
        Union[
            Mapping[str, Union[int, slice]], Collection[str]
        ]
    ] = None,
    item: Optional[Union[int, slice]] = None,
) -> Mapping[str, Union[NumpyArray, Optional[NumpyNumber]]]

Fetch results from the specified streams

PARAMETER DESCRIPTION
wait_until_done

If True, will wait until all results are processed before fetching

TYPE: bool DEFAULT: True

timeout

Timeout (in seconds) that will be applied to each of the api requests. This means that the actual overall timeout is at least the one given, but it could be more.

TYPE: float DEFAULT: VERY_LONG_TIME

stream_names

A mapping of stream names to indices or slices to fetch, or a collection of stream names to fetch all items from

TYPE: Optional[Union[Mapping[str, Union[int, slice]], Collection[str]]] DEFAULT: None

item

An index or slice to fetch from each stream

TYPE: Optional[Union[int, slice]] DEFAULT: None

RETURNS DESCRIPTION
Mapping[str, Union[NumpyArray, Optional[NumpyNumber]]]

A mapping of stream names to their fetched results, represented as numpy arrays or scalars (for streams that return a single value)

get

get(
    name: str,
    /,
    default: Optional[
        Union[AnySingleStreamFetcher, _T]
    ] = None,
) -> Optional[Union[AnySingleStreamFetcher, _T]]

Get a handle to a named result from stream_processing

PARAMETER DESCRIPTION
name

The named result using in stream_processing

TYPE: str

default

The default value to return if the named result is unknown

TYPE: Optional[Union[AnySingleStreamFetcher, _T]] DEFAULT: None

RETURNS DESCRIPTION
Optional[Union[AnySingleStreamFetcher, _T]]

A handle object to the results MultipleNamedJobResult or SingleNamedJobResult or None if the named results in unknown

is_processing

is_processing() -> bool

Check if the job is still processing results

RETURNS DESCRIPTION
bool

True if results are still being processed, False otherwise

items

items() -> ItemsView[str, AnySingleStreamFetcher]

Returns a view, in which the first item is the name of the result and the second is the result

keys

keys() -> KeysView[str]

Returns a view of the names of the results

values

values() -> ValuesView[AnySingleStreamFetcher]

Returns a view of the results

wait_for_all_values

wait_for_all_values(
    timeout: Optional[float] = None,
) -> bool

Wait until we know all values were processed for all named results

PARAMETER DESCRIPTION
timeout

Timeout for waiting in seconds

TYPE: Optional[float] DEFAULT: None

RETURNS DESCRIPTION
bool

Returns True if all completed successfully; False if any result stream was closed prematurely (e.g., due to job failure or cancellation).

qm.BaseSingleStreamFetcher

BaseSingleStreamFetcher(
    schema: JobResultItemSchema,
    service: Union[JobResultServiceApi, JobResultApi],
    capabilities: ServerCapabilities,
    multiple_streams_fetcher: Optional[
        MultipleStreamsFetcher
    ],
)

job_id property

job_id: str

The job id this result came from

name property

name: str

The name of the result this handle is connected to

count_so_far

count_so_far() -> int

also len(handle)

RETURNS DESCRIPTION
int

The number of values this result has so far

fetch abstractmethod

fetch(
    item: Union[int, slice],
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False,
    timeout: Optional[float] = None
) -> ReturnedT

Fetches specific results from the current result stream saved in server memory. The result stream is populated by the save() or save_all() statements.

PARAMETER DESCRIPTION
item

The index, or a slice indicating a range, of the result in the stream.

TYPE: Union[int, slice]

check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

Results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

timeout

Timeout for waiting in seconds.

TYPE: Optional[float] DEFAULT: None

RETURNS DESCRIPTION
ReturnedT

The requested result from the stream. This can be a single numpy scalar or a numpy array, which depends

ReturnedT

on the stream operators that were used. For example, using a buffer or 'with_timestamps' will return a

ReturnedT

numpy array.

Example
res.fetch() # return the item in the top position

fetch_all

fetch_all(
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False
) -> ReturnedT

Fetches all available results from the current result stream saved in server memory. The result stream is populated by the save() or save_all() statements.

PARAMETER DESCRIPTION
check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

Results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
ReturnedT

All available results for the stream. This can be a single numpy scalar or a numpy array, which depends

ReturnedT

on the stream operators that were used. For example, using a buffer or 'with_timestamps' will return a

ReturnedT

numpy array.

has_dataloss

has_dataloss() -> bool

Returns: true if there was data loss during job execution

strict_fetch

strict_fetch(
    item: Union[int, slice],
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False,
    timeout: Optional[float] = None
) -> NumpyArray

Fetches specific results from the current result stream saved in server memory. The result stream is populated by the save() or save_all() statements.

PARAMETER DESCRIPTION
item

The index, or a slice indicating a range, of the result in the stream.

TYPE: Union[int, slice]

check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

Results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

timeout

Timeout for waiting in seconds.

TYPE: Optional[float] DEFAULT: None

RETURNS DESCRIPTION
NumpyArray

The requested result from the stream. This can be a single numpy scalar or a numpy array, which depends

NumpyArray

on the stream operators that were used. For example, using a buffer or 'with_timestamps' will return a

NumpyArray

numpy array.

RAISES DESCRIPTION
Exception

If item is not an integer or a slice object.

StreamProcessingDataLossError

If data loss is detected in the data for the job.

Example
res.fetch(0)         #return the item in the top position
res.fetch(1)         #return the item in position number 2
res.fetch(slice(1,6))# return items from position 1 to position 6 (exclusive)
                     # same as res.fetch_all()[1:6]

wait_for_all_values

wait_for_all_values(
    timeout: float = VERY_LONG_TIME,
) -> bool

Wait until we know all values were processed for this named result

PARAMETER DESCRIPTION
timeout

Timeout for waiting in seconds

TYPE: float DEFAULT: VERY_LONG_TIME

RETURNS DESCRIPTION
bool

True if the job finished successfully and False if the job was closed before it was done.

bool

If the job is still running when reaching the timeout, a TimeoutError is raised.

wait_for_values

wait_for_values(
    count: int = 1, timeout: float = VERY_LONG_TIME
) -> None

Wait until we know at least count values were processed for this named result

PARAMETER DESCRIPTION
count

The number of items to wait for

TYPE: int DEFAULT: 1

timeout

Timeout for waiting in seconds

TYPE: float DEFAULT: VERY_LONG_TIME

qm.SingleStreamSingleResultFetcher

SingleStreamSingleResultFetcher(
    schema: JobResultItemSchema,
    service: Union[JobResultServiceApi, JobResultApi],
    capabilities: ServerCapabilities,
    multiple_streams_fetcher: Optional[
        MultipleStreamsFetcher
    ],
)

A handle to a result of a pipeline terminating with save

fetch

fetch(
    item: Union[int, slice],
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False,
    timeout: Optional[float] = None
) -> Optional[NumpyArrayOrSingleValue]

Fetches the results from the current result stream saved in server memory. The result stream is populated by the save() statement.

PARAMETER DESCRIPTION
item

The index, or a slice indicating a range, of the result in the stream.

TYPE: Union[int, slice]

check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

Results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

timeout

Timeout for waiting in seconds.

TYPE: Optional[float] DEFAULT: None

RETURNS DESCRIPTION
Optional[NumpyArrayOrSingleValue]

The current result. This can be a single numpy scalar or a numpy array, which depends

Optional[NumpyArrayOrSingleValue]

on the stream operators that were used. For example, using a buffer or 'with_timestamps' will return a

Optional[NumpyArrayOrSingleValue]

numpy array.

Example
res.fetch() # return the item in the top position

fetch_all

fetch_all(
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False
) -> Optional[NumpyArrayOrSingleValue]

Fetches the results from the current result stream saved in server memory. The result stream is populated by the save() statement.

PARAMETER DESCRIPTION
check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

Results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Optional[NumpyArrayOrSingleValue]

The current result from the stream. This can be a single numpy scalar or a numpy array, which depends

Optional[NumpyArrayOrSingleValue]

on the stream operators that were used. For example, using a buffer or 'with_timestamps' will return a

Optional[NumpyArrayOrSingleValue]

numpy array.

qm.SingleStreamMultipleResultFetcher

SingleStreamMultipleResultFetcher(
    schema: JobResultItemSchema,
    service: Union[JobResultServiceApi, JobResultApi],
    capabilities: ServerCapabilities,
    multiple_streams_fetcher: Optional[
        MultipleStreamsFetcher
    ],
)

A handle to a result of a pipeline terminating with save_all, or to a legacy save (save using a tag, instead of a stream)

fetch

fetch(
    item: Union[int, slice],
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False,
    timeout: Optional[float] = None
) -> NumpyArray

Fetches specific results from the current result stream saved in server memory. The result stream is populated by the save() or save_all() statement (or by a legacy save statement).

PARAMETER DESCRIPTION
item

The index, or a slice indicating a range, of the result in the stream.

TYPE: Union[int, slice]

check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

Results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

timeout

Timeout for waiting in seconds.

TYPE: Optional[float] DEFAULT: None

RETURNS DESCRIPTION
NumpyArray

The requested result from this stream. This can be a single numpy scalar or a numpy array, which depends

NumpyArray

on the stream operators that were used. For example, using a buffer or 'with_timestamps' will return a

NumpyArray

numpy array.

Example
res.fetch(0)         # return the item in the top position
res.fetch(1)         # return the item in position number 2
res.fetch(slice(1,6))# return items from position 1 to position 6 (exclusive)
                     # same as res.fetch_all()[1:6]