Skip to content

Results API

qm.StreamsManager

StreamsManager(
    service: Union[JobResultServiceApi, JobResultApi],
    capabilities: ServerCapabilities,
    wait_until_func: Optional[
        Callable[[Literal["Done"], float], None]
    ],
)

Access to the results of a QmJob

This object is created by calling QmJob.result_handles

Assuming you have an instance of StreamsManager:

    job_results: StreamsManager
This object is iterable:

    for name, handle in job_results:
        print(name)

Can detect if a name exists:

if "somename" in job_results:
    print("somename exists!")
    handle = job_results.get("somename")

fetch_results

fetch_results(
    wait_until_done: bool = True,
    timeout: float = VERY_LONG_TIME,
    stream_names: Optional[
        Mapping[str, Union[int, slice]]
    ] = None,
    item: None = None,
) -> Mapping[str, numpy.typing.NDArray[numpy.generic]]
fetch_results(
    wait_until_done: bool = True,
    timeout: float = VERY_LONG_TIME,
    stream_names: Optional[Collection[str]] = None,
    item: Optional[Union[int, slice]] = None,
) -> Mapping[str, numpy.typing.NDArray[numpy.generic]]
fetch_results(
    wait_until_done: bool = True,
    timeout: float = VERY_LONG_TIME,
    stream_names: Optional[
        Union[
            Mapping[str, Union[int, slice]], Collection[str]
        ]
    ] = None,
    item: Optional[Union[int, slice]] = None,
) -> Mapping[str, numpy.typing.NDArray[numpy.generic]]

Fetch results from the specified streams

PARAMETER DESCRIPTION
wait_until_done

If True, will wait until all results are processed before fetching

TYPE: bool DEFAULT: True

timeout

Timeout for waiting in seconds

TYPE: float DEFAULT: VERY_LONG_TIME

stream_names

A mapping of stream names to indices or slices to fetch, or a collection of stream names to fetch all items from

TYPE: Optional[Union[Mapping[str, Union[int, slice]], Collection[str]]] DEFAULT: None

item

An index or slice to fetch from each stream

TYPE: Optional[Union[int, slice]] DEFAULT: None

RETURNS DESCRIPTION
Mapping[str, NDArray[generic]]

A mapping of stream names to their fetched results as numpy arrays

get

get(
    name: str,
    /,
    default: Optional[
        Union[BaseSingleStreamFetcher, _T]
    ] = None,
) -> Optional[Union[BaseSingleStreamFetcher, _T]]

Get a handle to a named result from stream_processing

PARAMETER DESCRIPTION
name

The named result using in stream_processing

TYPE: str

default

The default value to return if the named result is unknown

TYPE: Optional[Union[BaseSingleStreamFetcher, _T]] DEFAULT: None

RETURNS DESCRIPTION
Optional[Union[BaseSingleStreamFetcher, _T]]

A handle object to the results MultipleNamedJobResult or SingleNamedJobResult or None if the named results in unknown

is_processing

is_processing() -> bool

Check if the job is still processing results

RETURNS DESCRIPTION
bool

True if results are still being processed, False otherwise

items

items() -> ItemsView[str, BaseSingleStreamFetcher]

Returns a view, in which the first item is the name of the result and the second is the result

keys

keys() -> KeysView[str]

Returns a view of the names of the results

values

values() -> ValuesView[BaseSingleStreamFetcher]

Returns a view of the results

wait_for_all_values

wait_for_all_values(
    timeout: Optional[float] = None,
) -> bool

Wait until we know all values were processed for all named results

PARAMETER DESCRIPTION
timeout

Timeout for waiting in seconds

TYPE: Optional[float] DEFAULT: None

RETURNS DESCRIPTION
bool

Returns True if all completed successfully; False if any result stream was closed prematurely (e.g., due to job failure or cancellation).

qm.BaseSingleStreamFetcher

BaseSingleStreamFetcher(
    schema: JobResultItemSchema,
    service: Union[JobResultServiceApi, JobResultApi],
    stream_metadata_errors: list[StreamMetadataError],
    stream_metadata: Optional[StreamMetadata],
    capabilities: ServerCapabilities,
    multiple_results_fetcher: Optional[
        MultipleStreamsFetcher
    ],
)

job_id property

job_id: str

The job id this result came from

name property

name: str

The name of the result this handle is connected to

stream_metadata property

stream_metadata: Optional[StreamMetadata]

Provides the StreamMetadata of this stream.

Metadata currently includes the values and shapes of the automatically identified loops in the program.

count_so_far

count_so_far() -> int

also len(handle)

RETURNS DESCRIPTION
int

The number of values this result has so far

fetch

fetch(
    item: Union[int, slice],
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False
) -> Optional[numpy.typing.NDArray[numpy.generic]]

Fetch a single result from the current result stream saved in server memory. The result stream is populated by the save().

PARAMETER DESCRIPTION
item

ignored

TYPE: Union[int, slice]

check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Optional[NDArray[generic]]

the current result

Example
res.fetch() # return the item in the top position

fetch_all

fetch_all(
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False
) -> Optional[numpy.typing.NDArray[numpy.generic]]

Fetch a result from the current result stream saved in server memory. The result stream is populated by the save() and save_all() statements. Note that if save_all() statements are used, calling this function twice may give different results.

PARAMETER DESCRIPTION
check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Optional[NDArray[generic]]

all results of current result stream

has_dataloss

has_dataloss() -> bool

Returns: true if there was data loss during job execution

strict_fetch

strict_fetch(
    item: Union[int, slice],
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False
) -> numpy.typing.NDArray[numpy.generic]

Fetch a result from the current result stream saved in server memory. The result stream is populated by the save() and save_all() statements. Note that if save_all() statements are used, calling this function twice with the same item index may give different results.

PARAMETER DESCRIPTION
item

The index of the result in the saved results stream.

TYPE: Union[int, slice]

check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
NDArray[generic]

a single result if item is integer or multiple results if item is a Python slice object.

RAISES DESCRIPTION
Exception

If item is not an integer or a slice object.

StreamProcessingDataLossError

If data loss is detected in the data for the job.

Example
res.fetch(0)         #return the item in the top position
res.fetch(1)         #return the item in position number 2
res.fetch(slice(1,6))# return items from position 1 to position 6 (exclusive)
                     # same as res.fetch_all()[1:6]

wait_for_all_values

wait_for_all_values(
    timeout: float = VERY_LONG_TIME,
) -> bool

Wait until we know all values were processed for this named result

PARAMETER DESCRIPTION
timeout

Timeout for waiting in seconds

TYPE: float DEFAULT: VERY_LONG_TIME

RETURNS DESCRIPTION
bool

True if the job finished successfully and False if the job was closed before it was done.

bool

If the job is still running when reaching the timeout, a TimeoutError is raised.

wait_for_values

wait_for_values(
    count: int = 1, timeout: float = VERY_LONG_TIME
) -> None

Wait until we know at least count values were processed for this named result

PARAMETER DESCRIPTION
count

The number of items to wait for

TYPE: int DEFAULT: 1

timeout

Timeout for waiting in seconds

TYPE: float DEFAULT: VERY_LONG_TIME

qm.SingleStreamSingleResultFetcher

SingleStreamSingleResultFetcher(
    schema: JobResultItemSchema,
    service: Union[JobResultServiceApi, JobResultApi],
    stream_metadata_errors: list[StreamMetadataError],
    stream_metadata: Optional[StreamMetadata],
    capabilities: ServerCapabilities,
    multiple_results_fetcher: Optional[
        MultipleStreamsFetcher
    ],
)

A handle to a result of a pipeline terminating with save

fetch

fetch(
    item: Union[int, slice],
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False
) -> Optional[numpy.typing.NDArray[numpy.generic]]

Fetch a single result from the current result stream saved in server memory. The result stream is populated by the save().

PARAMETER DESCRIPTION
item

ignored

TYPE: Union[int, slice]

check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Optional[NDArray[generic]]

the current result

Example
res.fetch() # return the item in the top position

fetch_all

fetch_all(
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False
) -> Optional[numpy.typing.NDArray[numpy.generic]]

Fetch a result from the current result stream saved in server memory. The result stream is populated by the save() and save_all() statements. Note that if save_all() statements are used, calling this function twice may give different results.

PARAMETER DESCRIPTION
check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Optional[NDArray[generic]]

all result of current result stream

qm.SingleStreamMultipleResultFetcher

SingleStreamMultipleResultFetcher(
    job_results: StreamsManager,
    schema: JobResultItemSchema,
    service: Union[JobResultServiceApi, JobResultApi],
    stream_metadata_errors: List[StreamMetadataError],
    stream_metadata: Optional[StreamMetadata],
    capabilities: ServerCapabilities,
    multiple_streams_fetcher: Optional[
        MultipleStreamsFetcher
    ],
)

A handle to a result of a pipeline terminating with save_all

fetch

fetch(
    item: Union[int, slice],
    *,
    check_for_errors: bool = True,
    flat_struct: bool = False
) -> Optional[numpy.typing.NDArray[numpy.generic]]

Fetch a result from the current result stream saved in server memory. The result stream is populated by the save() and save_all() statements. Note that if save_all() statements are used, calling this function twice with the same item index may give different results.

PARAMETER DESCRIPTION
item

The index of the result in the saved results stream.

TYPE: Union[int, slice]

check_for_errors

If true, the function would also check whether run-time errors happened during the program execution and would write to the logger an error message.

TYPE: bool DEFAULT: True

flat_struct

results will have a flat structure - dimensions will be part of the shape and not of the type

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Optional[NDArray[generic]]

a single result if item is integer or multiple results if item is Python slice object.

Example
res.fetch(0)         # return the item in the top position
res.fetch(1)         # return the item in position number 2
res.fetch(slice(1,6))# return items from position 1 to position 6 (exclusive)
                     # same as res.fetch_all()[1:6]