Skip to content

Pipe utils

read_from_pipe(input_pipe)

Reads data from the input pipe.

Parameters:

Name Type Description Default
input_pipe str

The path to the input pipe.

required

Returns:

Type Description
Optional[Union[dict, list]]

Optional[Union[dict, list]]: The data read from the input pipe, or None if there was an error.

Source code in aria/ops/pipe_utils.py
def read_from_pipe(input_pipe: str) -> Optional[Union[dict, list]]:
    """Reads data from the input pipe.

    Args:
        input_pipe (str): The path to the input pipe.

    Returns:
        Optional[Union[dict, list]]: The data read from the input pipe, or None if there was an error.
    """
    logger.debug(f"Input Pipe: {input_pipe}")
    try:
        with open(input_pipe, "r") as input_file:
            logger.debug(f"Opened {input_file.name}")
            return json.load(input_file)  # type: ignore[no-any-return]
    except Exception as e:
        logger.error("Error when reading from Input Pipe.")
        logger.debug(e)
        return None

write_to_pipe(output_pipe, result)

Writes data to the output pipe.

Parameters:

Name Type Description Default
output_pipe str

The path to the output pipe.

required
result Optional[Union[dict, list]]

The data to write to the output pipe.

required
Source code in aria/ops/pipe_utils.py
def write_to_pipe(output_pipe: str, result: Optional[Union[dict, list]]) -> None:
    """Writes data to the output pipe.

    Args:
        output_pipe (str): The path to the output pipe.
        result (Optional[Union[dict, list]]): The data to write to the output pipe.
    """
    logger.debug(repr(result))
    logger.debug(f"Output Pipe: {output_pipe}")
    try:
        with open(output_pipe, "w") as output_file:
            logger.debug(f"Opened {output_pipe}")
            json.dump(result, output_file)
            logger.debug(f"Closing {output_pipe}")
    except Exception as e:
        logger.error("Error when writing to Output Pipe.")
        logger.debug(e)
    logger.debug("Finished writing results to Output Pipe.")