Source code for problem_bank_scripts.problem_bank_scripts

# Author: Firas Moosvi and Graham Bovett
# Date: 2021-05-09
# This file contains many helper functions that will be used across the question bank project.


# Imports
## Loading and Saving files & others
import errno
import uuid
import json
import pathlib
from collections import defaultdict
from shutil import copy2
import re
import codecs
import importlib.util
import problem_bank_helpers as pbh
import pandas as pd
import warnings
import tempfile
import traceback
import os

## Parse Markdown
import markdown_it
import mdformat.renderer

## Dealing with YAML
import yaml

## Loading files : https://stackoverflow.com/a/60687710
import importlib.resources

from .inputs import INPUT_TYPE_PROCESSORS

## Getting file modification time
import datetime
import git

## Topic Validation

path = pathlib.Path().resolve().as_posix()
topics = {"Template": "000.Template"}  # Start with special cased topics

try:
    subjects = [path.split("instructor_")[1].split("_bank")[0]]
except:
    subjects = ["physics", "datascience", "stats"]

for subject in subjects:
    url = f"https://raw.githubusercontent.com/open-resources/learning_outcomes/main/outputs_csv/LO_{subject}.csv"
    learning_outcomes = pd.read_csv(url)
    topics |= learning_outcomes[["Topic", "Numbered Topic"]].drop_duplicates().values

# Start of reading/parsing functions

def defdict_to_dict(defdict, finaldict):
    """Convert a defaultdict (nested) to a regular dictionary.
        - Answer copied from: https://stackoverflow.com/a/61133504/2217577
    Args:
        defdict (dict): defaultdict
        finaldict (dict): regular dictionary

    Returns:
        dict: Convert to regular dictionary
    """
    # pass in an empty dict for finaldict
    for k, v in defdict.items():
        if isinstance(v, defaultdict):
            # new level created and that is the new value
            finaldict[k] = defdict_to_dict(v, {})
        elif isinstance(v, dict) and v.get("_type", None) == "sympy":
            for k2, v2 in v.items():
                if isinstance(v2, (set, list)):
                    try:
                        v[k2] = sorted(v2, key=str)
                    except:
                        pass
                elif isinstance(v2, dict):
                    v[k2] = dict(sorted(v2.items(), key=lambda i: i[0]))
                
            finaldict[k] = v
        # Somewhere around Aug 2024, something changed in how objects were serialized and anything stored as a `numpy` object wasn't correctly parsed by `pyyaml`
        # See here for our history on this: (https://moosvilab.ok.ubc.ca/moosvilab/pl/zfp63dh6hjy5zdkzemzmtzqffr)
        # And the jupyterbook PR where this was also addressed: https://github.com/jupyter-book/jupyter-book/pull/2167

        elif hasattr(v, "dtype"):
            try:
                finaldict[k] = v.item()
            except Exception:
                finaldict[k] = v
        else:
            finaldict[k] = v
    return finaldict


def parse_body_part(pnum, md_text):
    """Parses markdown and returns a dictionary split by header

    Args:
        md_text (str): A string of markdown format

    Returns:
        parsed_md_text (dict): A dictionary split of the text
    """

    part = "part" + f"{pnum}"

    # Special dict to store stuff
    nested_dict = pbh.create_data2()

    # Create Markdown parser
    mdit = markdown_it.MarkdownIt()
    env = {}
    tokens = mdit.parse(md_text, env)

    # Get Level 2 headers and make sure there's only one!
    level2_headers = [
        i for i, j in enumerate(tokens) if j.tag == "h2" if j.nesting == 1
    ]
    assert (
        len(level2_headers) == 1
    ), "There is a problem in the question, there seem to be multiple level two headers in a body part, or there is a weird edge-case in the parse_body_part() function"

    assert (
        len(tokens[level2_headers[0] + 1].content) < 20
    ), "There is an (arbitrary/opinionated) restriction on the length of 20 chars for a a 'part' title."

    nested_dict[part]["title"] = tokens[level2_headers[0] + 1].content

    # Store the content of the level 2 header
    try:
        content = mdformat.renderer.MDRenderer().render(
            tokens[3 : get_next_headerloc(3, tokens, 3)], mdit.options, env
        )  # Note the 3 is there to exclude header start,header content,header end tokens
        nested_dict[part]["content"] = content.replace(r"\\", "\\")
    except IndexError:
        print(
            "It looks like there is an empty section of header level 2 in your md file."
        )
        raise

    # Get all Level 3 headers
    level3_headers = [
        i for i, j in enumerate(tokens) if j.tag == "h3" if j.nesting == 1
    ]

    for hd in level3_headers:
        header = tokens[hd + 1].content
        assert (
            len(header) < 20
        ), "There is an (arbitrary/opinionated) restriction on the length of 20 chars for a a 'sub-part' title."
        if "Answer Section" in header:
            header = "answer"
        try:
            content = codecs.unicode_escape_decode(
                mdformat.renderer.MDRenderer().render(
                    tokens[hd + 3 : get_next_headerloc(hd + 3, tokens, 3)],
                    mdit.options,
                    env,
                )
            )[
                0
            ]  # Note the +3 is there to exclude header start,header content,header end tokens
            nested_dict[part][header] = content
        except IndexError:
            print(
                "It looks like there is an empty section of header level 3 in your md file."
            )
            # TODO: in the future, suggest ignoring empty sections instead of throwing an error
            raise

    return defdict_to_dict(nested_dict, {})


def get_next_headerloc(start, tokens, header_level):
    """Some annoying code that takes in a token to start looking from, and returns the token at the start of the next header level.

    Args:
        start (int): Token # to start looking from
        tokens (token): Parsed MD file
        header_level (int): "Next" Header level to look for.

    Returns:
        int: Token # to "end" looking at
    """

    # no header found, set close to end
    close = len(tokens)

    for i, j in enumerate(tokens[start:]):
        if j.tag == f"h{header_level}" and j.nesting == 1:
            # next header found
            close = i + start
            break
    return close


def read_md_problem(filepath):
    """Reads a MystMarkdown problem file and returns a dictionary of the header and body

    Args:
        filepath (str): Path of file to read.

    Returns:
        dict: In this dictionary there are four keys containing useful portions of the parsed md file:
            - ``header`` - Header of the problem file (nested dictionary).
            - ``body_parts`` - Body text of the problem file (nested dictionary).
            - ``num_parts`` - Number of parts in the problem (integer).
            - ``body_parts_split`` - Dictionary with each part split into individual components.
    """

    mdtext = pathlib.Path(filepath).read_text(encoding="utf8")

    # Deal with YAML header
    header_text = mdtext.rsplit("---\n")[1]
    header = yaml.safe_load("---\n" + header_text)
    validate_header(header)
    # Deal with Markdown Body
    body = mdtext.rsplit("---\n")[2]

    # Set up the markdown parser
    # to be honest, not fully sure what's going on here, see this issue: https://github.com/executablebooks/markdown-it-py/issues/164

    mdit = markdown_it.MarkdownIt()
    env = {}

    # Set up tokens by parsing the md file
    tokens = mdit.parse(body, env)

    blocks = {}

    block_count = 0

    num_titles = 0

    ###
    for x, t in enumerate(tokens):
        if t.tag == "h1" and t.nesting == 1:  # title
            # oh boy. this is going to break and it will be your fault firas.
            blocks["title"] = [x, x + 3]
            num_titles += 1

        elif t.tag == "h2" and t.nesting == 1:
            block_count += 1

            if block_count == 1:
                blocks[f"block{block_count}"] = [x]
            else:
                blocks[f"block{block_count - 1}"].append(x)
                blocks[f"block{block_count}"] = [x]
    ###
    # Add -1 to the end of the last block
    blocks[f"block{block_count}"].append(len(tokens))

    # Assert statements (turn into tests!)
    assert (
        num_titles == 1
    ), f"I see {num_titles} Level 1 Headers (#) in this file, there should only be one!"
    assert (
        block_count >= 1
    ), f"I see {block_count - 1} Level 2 Headers (##) in this file, there should be at least 1"

    # Add the end of the title block; # small hack
    # blocks['title'].append(blocks['block1'][0])

    # Get the preamble before the parts start
    blocks["preamble"] = [blocks["title"][1], blocks["block1"][0]]

    ## Process the blocks into markdown

    body_parts = {}
    parts_dict = {}

    part_counter = 0

    for k, v in blocks.items():
        rendered_part = (
            mdformat.renderer.MDRenderer()
            .render(tokens[v[0] : v[1]], mdit.options, env)
            .replace(r"\\", "\\")
        )

        if k == "title":
            body_parts["title"] = rendered_part

        elif k == "preamble":
            body_parts["preamble"] = rendered_part

        elif "Rubric" in rendered_part:
            body_parts["Rubric"] = rendered_part

        elif "Solution" in rendered_part:
            body_parts["Solution"] = rendered_part

        elif "Comments" in rendered_part:
            body_parts["Comments"] = rendered_part

        elif "Useful Info" in rendered_part:
            body_parts["Useful_info"] = rendered_part

        else:
            part_counter += 1
            body_parts[f"part{part_counter}"] = rendered_part

            parts_dict.update(parse_body_part(part_counter, rendered_part))

    return_dict = {
        "header": header,
        "body_parts": body_parts,
        "num_parts": part_counter,
        "body_parts_split": parts_dict,
    }
    return defdict_to_dict(return_dict, {})


def _remove_l3_headers(text: str, remove: set[str]) -> str:
    """Removes specific level 3+ headers from a markdown string; useful for removing sections like pl-answer-panel and pl-submission panel for the public version of the site.
    
    Args:
        text (str): Markdown text to process
        remove (set[str]): Set of strings to remove from the markdown text

    Returns:
        str: Processed markdown text
    """
    tokens_to_rerender = []
    next_is_new_header_text = False
    current_header_text = None

    mdit = markdown_it.MarkdownIt()
    env = {}
    tokens = mdit.parse(text, env)

    for token in tokens:
        if token.type == "heading_open":
            next_is_new_header_text = True
            tokens_to_rerender.append(token)
            current_header_text = None
            continue

        if next_is_new_header_text:
            next_is_new_header_text = False
            current_header_text = token.content
        
            if current_header_text in remove:
                tokens_to_rerender.pop()
        
        if current_header_text not in remove:
            tokens_to_rerender.append(token)
    
    return (
        mdformat.renderer.MDRenderer()
        .render(tokens_to_rerender, mdit.options, env)
        .replace(r"\\", "\\")
    )


def dict_to_md(md_dict: dict[str, str], remove_keys=None):
    """Takes a nested dictionary (e.g. output of read_md_problem()) and returns a multi-line string  that can be written to a file (after removing specified keys).
    Args:
        md_dict (dict): A nested dictionary, for e.g. the output of `read_md_problem()`
        remove_keys (list[str], optional): Any keys to remove from the dictionary, for instance solutions. Defaults to removing no keys.

    Returns:
        str: A multi-line string that can be written to a file.
    """

    # md_dict: dict[str, str] = defdict_to_dict(md_dict, {})

    # Question Title and Preamble
    md_string = md_dict.pop("title", "")
    md_string += md_dict.pop("preamble", "")

    _remove = set() if remove_keys is None else set(remove_keys)

    for heading, content in md_dict.items():
        if heading in _remove:
            continue
        
        if _remove and "###" in content:
            md_string += "\n" + _remove_l3_headers(content, _remove)
        else:
            md_string += "\n" + content

    return md_string


## Functions from md-to-pl


def write_info_json(output_path, parsed_question, modified_time: str | None = None):
    """
    Args:
        output_path (Path): [description]
        parsed_question (dict]): [description]
        modified_time (str | None, optional): Last commit timestamp or modified timestamp of the file
    """

    # Deal with optional tags in info.json
    # optional = ""

    optional_keys = {
        "gradingMethod",
        "partialCredit",
        "dependencies",
        "singleVariant",
        "showCorrectAnswer",
        "externalGradingOptions",
        "workspaceOptions"
    }

    # Add tags based on part type
    q_types = []

    for pnum in range(1, parsed_question["num_parts"] + 1):
        part = "part" + f"{pnum}"
        q_types.append(parsed_question["header"][part]["type"])

    auto_tags = []
    if len(q_types) > 1:
        auto_tags.append("multi_part")
    auto_tags.extend(list(set(q_types)))

    if (difficulties := parsed_question["header"].get("difficulty", ["undefined"])) != ["undefined"]:
        if not isinstance(difficulties, list):
            difficulties = [difficulties]
        auto_tags.extend(difficulty.lower() for difficulty in difficulties)

    # tags is technically an optional key for a question author to specify
    auto_tags.extend(parsed_question["header"].get("tags", []))
    auto_tags = [v for v in auto_tags if v != "unknown"]

    info_json = {
        "uuid": str(uuid.uuid3(uuid.NAMESPACE_DNS, str(output_path))),
        "title": parsed_question["header"]["title"],
        "topic": parsed_question["header"]["topic"],
        "tags": auto_tags,
        "type": "v3",
    }
    info_json.update(
        {
            key: parsed_question["header"][key]
            for key in parsed_question["header"].keys() & optional_keys
        }
    )

    if "workspaceOptions" in info_json: # validate workspaceOptions contains the required keys if it exists
        image = "image" in info_json["workspaceOptions"]
        port = "port" in info_json["workspaceOptions"]
        home = "home" in info_json["workspaceOptions"]
        if not (image and port and home):
            msg = "workspaceOptions must contain image, port, and home keys"
            raise SyntaxError(msg)
        if not isinstance(info_json["workspaceOptions"]["port"], int):
            msg = f"workspaceOptions.port must be an integer, got {type(info_json['workspaceOptions']['port'])!r} instead"
            raise TypeError(msg)

    if modified_time:
        info_json["comment"] = {"lastModified": modified_time}

    # End add tags
    with pathlib.Path(output_path / "info.json").open("w") as output_file:
        json.dump(info_json, output_file, indent=4)


def assemble_server_py(parsed_question, location):
    """Assembles a string version of the server.py file from the YAML header of the md file.

    Args:
        parsed_question (dict): dictionary that is created upon reading of the md problem.
        location (string): 'local' or 'prairielearn' ; the import statements are different depending on if it's local or on a PL server.
    """

    server_dict = parsed_question["header"]["server"].copy()

    if location == "local":
        # This is needed to run this locally compared to when it gets run on a PL server
        server_dict["imports"] = parsed_question["header"]["server"]["imports"].replace(
            "import prairielearn as pl",
            "import problem_bank_scripts.prairielearn as pl",
        )

    if "import problem_bank_helpers as pbh" not in server_dict["imports"]:
        server_dict["imports"] += "\nimport problem_bank_helpers as pbh # Added in by problem bank scripts" 

    server_py = ""

    server_py += server_dict.get("imports", "") + "\n"

    for function, code in server_dict.items():
        indented_code = code.replace("\n", "\n    ")
        # With the custom header, add functions to server.py as-is
        if function == "custom":
            server_py += f"{code}"
        elif function == "imports":
            continue
        else:
            if code:
                server_py += f"def {function}(data):\n    {indented_code}\n"
        if location == "prairielearn" and function == "generate":
            server_py += """\
    # Start code added automatically by problem_bank_scripts

    # Convert backticks to code blocks/fences in answer choices.
    pbh.backticks_to_code_tags(data2)

    # Update data with data2
    data.update(data2)

    # End code added in by problem bank scripts

"""

    return server_py


def write_server_py(output_path, parsed_question):
    """Writes the server.py file to disk
    Args:
        output_path ([type]): [description]
        parsed_question ([type]): [description]
    """

    output_path = pathlib.Path(output_path)

    server_file = assemble_server_py(parsed_question, "prairielearn")

    # Deal with path differences when using PL
    server_file = server_file.replace(
        'read_csv("', 'read_csv(data["options"]["client_files_course_path"]+"/'
    )

    # Write server.py
    (output_path / "server.py").write_text(server_file, encoding="utf8")


def validate_multiple_choice(part_name, parsed_question, data_dict):
    """Validates a markdown format multiple-choice question
    
    Args:
        part_name (string): Name of the question part being processed (e.g., part1, part2, etc...)
        parsed_question (dict): Dictionary of the MD-parsed question (output of `read_md_problem`)
        data_dict (dict): Dictionary of the `data` dict created after running server.py using `exec()`

    Returns:
        bool: True if the question is valid, False otherwise.
    """

    def validate_ans(val, ans_name, part_name):
        try:
            json.dumps(val)
            if not isinstance(val, bool):
                msg = (
                    f"Object of type {val.__class__.__name__!r} for the value of correct for {ans_name!r} of {part_name!r} is not a boolean value."
                    "\n Implicitly relying on truthiness of the value is not recommended. Please use `True` or `False`."
                )
                warnings.warn(msg, SyntaxWarning)
            return bool(val)
        except TypeError as err:
            msg = f"Object of type {val.__class__.__name__!r} is not valid for the correct key for answer {ans_name!r} of {part_name!r}."
            raise TypeError(msg) from err

    if any(
        validate_ans(ans["correct"], key, part_name)
        for key, ans in data_dict["params"][f"{part_name}"].items()
        if "ans" in key
    ):
        return True

    none_of_the_above = parsed_question["header"][part_name]["pl-customizations"].get("none-of-the-above", "false")

    return none_of_the_above in {"correct", "random"}


def remove_correct_answers(data2_dict):
    """Magical recursive function that removes particular keys from a nested dictionary: https://stackoverflow.com/a/29652561/2217577

    Args:
        data2_dict (dict): Dictionary (nested) from which to remove key:value

    Returns:
        data2_dict (dict): Dictionary with the offending keys removed
    """

    # This was adapted from this SO: https://stackoverflow.com/a/29652561/2217577
    def gen_dict_extract(key_to_remove, dict_object):
        if hasattr(dict_object, "items"):
            for k, v in list(dict_object.items()):
                if key_to_remove in k:
                    dict_object.pop(k, None)
                if isinstance(v, dict):
                    for result in gen_dict_extract(key_to_remove, v):
                        yield result
                elif isinstance(v, list):
                    for d in v:
                        for result in gen_dict_extract(key_to_remove, d):
                            yield result

    list(gen_dict_extract("correct", data2_dict))

    return data2_dict


_ATTRIBUTIONS: dict[str, str] = json.loads(importlib.resources.files("problem_bank_scripts").joinpath("attributions.json").read_bytes())
KNOWN_ATTRIBUTIONS: list[str] = list(_ATTRIBUTIONS.keys())

def process_attribution(attribution):
    """Takes in a string and returns the HTML for the attribution

    Args:
        attribution (string): One of a set of pre-defined values corresponding to a particular attribution.

    Returns:
        string (str): returns the html of the attribution
    """

    try:
        return _ATTRIBUTIONS[attribution]

    except KeyError:
        print(
            f"`Attribution` value of {attribution} is not recognized.",
            f"Currently, the only possible values are: {KNOWN_ATTRIBUTIONS}.",
            "You need to update your md file and fix the `attribution` in the header",
        )
        raise


[docs] def process_question_md( source_filepath: os.PathLike[str] | str, output_path: os.PathLike[str] | str | None = None, instructor: bool = False, ) -> None: """Processes an OPB markdown file and writes the output to a file. Args: source_filepath (os.PathLike[str] | str): Path to the markdown file to be processed. output_path (os.PathLike[str] | str, optional): Path to the output file. Defaults to None. instructor (bool, optional): Flag to determine if the output is for an instructor or not. Defaults to False. This determines if the solutions are included in the output or not. """ try: source_filepath = pathlib.Path(source_filepath).resolve(strict=True) except: print(f"{source_filepath} - File does not exist.") raise if output_path is None: path_replace = "output/instructor" if instructor else "output/public" if "source" in (_src := str(source_filepath)): output_path = pathlib.Path(_src.replace("source", path_replace)) else: msg = f"Check the source filepath; it does not have 'source' in it: {source_filepath}" raise ValueError(msg) else: ## TODO: Make this a bit more robust, perhaps by switching encodings!? output_path = pathlib.Path(output_path) # deal with multi-line strings in YAML Dump ## Code copied from here: https://stackoverflow.com/a/33300001/2217577 def str_presenter(dumper, data2): if len(data2.splitlines()) > 1: # check for multiline string # data2 = re.sub('\\n[\s].*\\n','\n\n',data2) # THIS IS WRONG!!! data2 = re.sub( r"\n\s+\n", "\n\n", data2 ) # # Try \s{3,} for three or more spaces return dumper.represent_scalar("tag:yaml.org,2002:str", data2, style="|") return dumper.represent_scalar("tag:yaml.org,2002:str", data2) yaml.add_representer(str, str_presenter) parsed_q = read_md_problem(source_filepath) header = parsed_q["header"] body_parts = parsed_q["body_parts"] ################################################################################# # Run the python code; this improved way was suggested by Phil Austin of UBC EOAS server_py = assemble_server_py(parsed_q, "local") server = {} data2 = pbh.create_data2() with tempfile.TemporaryDirectory(suffix=f"_{output_path.stem}", ignore_cleanup_errors=True) as dirpath: file = pathlib.Path(dirpath).joinpath("server.py") file.write_text(server_py, encoding="utf8") try: code = compile(server_py, file.as_posix(), "exec") exec(code, server) server["generate"](data2) except Exception as e: msg = f"Error in running the server code, please review the below traceback: \n\n{traceback.format_exc()}" raise type(e)(msg) from None ################################################################################# # Remove the solutions from the server section if instructor is False: # Remove python solution from the public view header.pop("server", None) # Remove correct answers from the data2 dict data2_sanitized = defdict_to_dict(data2, {}) data2_sanitized = defdict_to_dict(remove_correct_answers(data2_sanitized), {}) # Update the YAML header to add substitutions header.update({"myst": {"substitutions": data2_sanitized} }) # Update the YAML header to add substitutions, unsort it, and process for file header_yml = yaml.dump(header, sort_keys=False, allow_unicode=True) # Write the YAML to a file output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text( "---\n" + header_yml + "---\n" + dict_to_md( body_parts, remove_keys=[ "Rubric", "Solution", "Comments", "pl-submission-panel", "pl-answer-panel", ], ) + "\n## Attribution\n\n" + process_attribution(header.get("attribution")), encoding="utf8", ) else: # Update the YAML header to add substitutions header.update({"myst": {"substitutions": defdict_to_dict(data2, {})}}) # return {'header':header, # 'body_parts':body_parts, # 'output_path':output_path} # Update the YAML header to add substitutions, unsort it, and process for file header_yml = yaml.dump(header, sort_keys=False, allow_unicode=True) # Write the YAML to a file output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text( "---\n" + header_yml + "\n---\n" + dict_to_md(body_parts) + "\n## Attribution\n\n" + process_attribution(header.get("attribution")), encoding="utf8", ) # Create the file errors list os_errors = [] # Move client assets (generally images) files_to_copy = header.get("assets") if files_to_copy: pl_path = output_path.parent for file in files_to_copy: try: copy2(pathlib.Path(source_filepath).parent / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(str(e)) # Move server assets files_to_copy = header.get("serverFiles") if files_to_copy and instructor: pl_path = output_path.parent for file in files_to_copy: try: copy2(pathlib.Path(source_filepath).parent / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(str(e)) # Move autograde py test files files_to_copy = header.get("autogradeTestFiles") if files_to_copy: pl_path = output_path.parent / "tests" pl_path.mkdir(parents=True, exist_ok=True) for file in files_to_copy: if file != "starter_code.py" and not instructor: continue try: copy2(pathlib.Path(source_filepath).parent / "tests" / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(str(e)) # Move workspace files files_to_copy = header.get("workspaceFiles") if files_to_copy: pl_path = output_path.parent / "workspace" pl_path.mkdir(parents=True, exist_ok=True) for file in files_to_copy: try: copy2(pathlib.Path(source_filepath).parent / "workspace" / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(str(e)) # Move workspace files files_to_copy = header.get("workspaceTemplates") if files_to_copy: pl_path = output_path.parent / "workspaceTemplates" pl_path.mkdir(parents=True, exist_ok=True) for file in files_to_copy: try: copy2(pathlib.Path(source_filepath).parent / "workspaceTemplates" / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(str(e)) if os_errors: error_msg = "\n ".join(os_errors) raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), f"Error(s) copying specified files:\n {error_msg}")
[docs] def process_question_pl( source_filepath: os.PathLike[str] | str, output_path: os.PathLike[str] | str | None = None, dev: bool = False, ): """Processes an OPB markdown file and converts it to a prairielearn compatible question. Args: source_filepath (os.PathLike[str] | str): Path to the markdown file to be processed. output_path (os.PathLike[str] | str, optional): Path to the output file. Defaults to None. dev (bool, optional): Flag to determine if the question is under development. Defaults to False. """ try: _path = pathlib.Path(source_filepath).resolve() except: print(f"{source_filepath} - File does not exist.") raise if not _path.exists(): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), source_filepath) if not _path.is_file(): raise IsADirectoryError(errno.EISDIR, os.strerror(errno.EISDIR), source_filepath) if output_path is None: if "source" in str(_path): output_path = pathlib.Path(_path.as_posix().replace("source", "output/prairielearn")).parent else: msg = f"Check the source filepath; it does not have 'source' in it: {source_filepath}" raise ValueError(msg) else: ## TODO: It's annoying that here output_path.parent is used, but for md problems, it's just output_path output_path = pathlib.Path(output_path).parent # Parse the MD file parsed_q = read_md_problem(source_filepath) parsed_q["header"]["topic"] = topics[ parsed_q["header"]["topic"] ] # Add integer topic id, this is safe because we validated the header in read_md_problem # Create output dir if it doesn't exist output_path.mkdir(parents=True, exist_ok=True) ################################################################################# # Run the python code; this improved way was suggested by Phil Austin of UBC EOAS server_py = assemble_server_py(parsed_q, "local") server = {} data2 = pbh.create_data2() with tempfile.TemporaryDirectory(suffix=f"_{output_path.stem}", ignore_cleanup_errors=True) as dirpath: file = pathlib.Path(dirpath).joinpath("server.py") file.write_text(server_py, encoding="utf8") try: code = compile(server_py, file.as_posix(), "exec") exec(code, server) server["generate"](data2) except Exception as e: msg = f"Error in running the server code, please review the below traceback: \n\n{traceback.format_exc()}" raise type(e)(msg) from None ################################################################################# if dev: tags = parsed_q["header"].get("tags", []) tags.append("DEV") parsed_q["header"]["tags"] = tags try: repo = git.Repo(_path.parent, search_parent_directories=True) repo.working_dir commit = next(repo.iter_commits(None, source_filepath, max_count=1)) modified_time = commit.committed_datetime except: modified_time = datetime.datetime.fromtimestamp(_path.stat().st_mtime, tz=datetime.timezone.utc) # Write info.json file write_info_json(output_path, parsed_q, modified_time.strftime("%Y-%m-%dT%H:%M:%S%z")) # Question Preamble preamble = parsed_q["body_parts"].get("preamble", None) # TODO: Remove Debugging print statement # print(f"premable: {preamble}") if preamble: question_html = f"<pl-question-panel>\n<markdown>\n{ preamble }\n</markdown>\n</pl-question-panel>\n\n" else: question_html = "" # Useful info panel if (useful_info := parsed_q["body_parts"].get("Useful_info", None)): useful_info = useful_info.replace("## Useful Info\n", "") question_html += f"""<pl-hidden-hints> <pl-hint hint-name="Helpful Information"><markdown>{useful_info}</markdown></pl-hint> </pl-hidden-hints> """ # Single and Multi-part question construction for pnum in range(1, parsed_q["num_parts"] + 1): part = "part" + f"{pnum}" q_type = parsed_q["header"][part]["type"] question_html += f"\n<!-- ######## Start of Part {pnum} ######## -->\n\n" if parsed_q["num_parts"] > 1: question_html += f'<pl-card header="{parsed_q["body_parts_split"][part]["title"]}">\n' if "multiple-choice" in q_type and not validate_multiple_choice(part,parsed_q,data2): msg = ( f"Multiple choice question {part} does not have a correct answer and " "the pl-customization `none-of-the-above` was not set to `correct` or `random`." ) raise ValueError(msg) converter = INPUT_TYPE_PROCESSORS.get(q_type) if converter is None: msg = f"The question type ({q_type}) is not yet implemented." raise NotImplementedError(msg) else: question_html += f"{converter(part,parsed_q,data2)}" if parsed_q["num_parts"] > 1: question_html += "</pl-card>\n" # Add pl-submission-panel and pl-answer-panel (if they exist) subm_panel = parsed_q["body_parts_split"][part].get("pl-submission-panel", None) q_panel = parsed_q["body_parts_split"][part].get("pl-answer-panel", None) if subm_panel: question_html += f"\n<pl-submission-panel>{subm_panel}</pl-submission-panel>\n" if q_panel: question_html += f"\n<pl-answer-panel>{q_panel}</pl-answer-panel>\n" question_html += f"\n<!-- ######## End of Part {pnum} ######## -->\n" # Add Attribution question_html += f"\n<pl-question-panel>\n<markdown>\n---\n{process_attribution(parsed_q['header'].get('attribution'))}\n</markdown>\n</pl-question-panel>\n" # Fix Latex over-escaping from mdformat (i.e. _, [, and ]being replaced with \_, \[, and \]) # See https://github.com/open-resources/problem_bank_scripts/issues/89 # Also see https://github.com/open-resources/problem_bank_scripts/pull/92 question_html = question_html.replace("\\_", "_").replace("\\[","[").replace("\\]","]") question_html = question_html.replace("\\*", "*").replace("\\<","<").replace("\\`","`") # Final pre-processing question_html = pl_image_path(question_html) # Write question.html file (output_path / "question.html").write_text(question_html, encoding="utf8") ### TODO solve the issue with the latex escape sequences, this is a workaround # with open((output_path / "question.html"), "w") as qfile: # print(f"{question_html}", file=qfile) # Write server.py file write_server_py(output_path, parsed_q) # Create the file errors list os_errors = [] # Move client assets (generally images) files_to_copy = parsed_q["header"].get("assets") if files_to_copy: pl_path = output_path / "clientFilesQuestion" pl_path.mkdir(parents=True, exist_ok=True) for file in files_to_copy: try: copy2(pathlib.Path(source_filepath).parent / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(str(e)) # Move server assets files_to_copy = parsed_q["header"].get("serverFiles") if files_to_copy: pl_path = output_path / "serverFilesQuestion" pl_path.mkdir(parents=True, exist_ok=True) for file in files_to_copy: try: copy2(pathlib.Path(source_filepath).parent / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(str(e)) # Move autograde py test files files_to_copy = parsed_q["header"].get("autogradeTestFiles") if files_to_copy: pl_path = output_path / "tests" pl_path.mkdir(parents=True, exist_ok=True) for file in files_to_copy: try: copy2(pathlib.Path(source_filepath).parent / "tests" / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(str(e)) # Move workspace files files_to_copy = parsed_q["header"].get("workspaceFiles") if files_to_copy: pl_path = output_path / "workspace" pl_path.mkdir(parents=True, exist_ok=True) for file in files_to_copy: try: copy2(pathlib.Path(source_filepath).parent / "workspace" / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(str(e)) # Move workspace files files_to_copy = parsed_q["header"].get("workspaceTemplates") if files_to_copy: pl_path = output_path / "workspaceTemplates" pl_path.mkdir(parents=True, exist_ok=True) for file in files_to_copy: try: copy2(pathlib.Path(source_filepath).parent / "workspaceTemplates" / file, pl_path / file) except (FileExistsError, FileNotFoundError, IsADirectoryError, PermissionError) as e: os_errors.append(f"{e} {e.filename}") if os_errors: error_msg = "\n ".join(os_errors) raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), f"Error(s) copying specified files:\n {error_msg}")
def pl_image_path(html): """Adds ``{{options.client_files_question_url}}`` directory before the path automatically""" ext_group = r"((?!http).*\.(?:png|gif|jpg|jpeg))" base_repl = r"{{options.client_files_question_url}}/\1" # If image files are included as markdown format, add {{options.client_files_question_url}} res = re.sub(rf"\({ext_group}\)", rf"({base_repl})", html) # If image files are included as html format, add {{options.client_files_question_url}} return re.sub(rf"src[\s,=]*\"{ext_group}", f'src="{base_repl}', res) def validate_header(header_dict): # check if topic is valid (i.e. from the list of topics in the learning_outcomes repo for this subject) if topics.get(topic := header_dict["topic"], None) is None: msg = f"topic '{topic}' is not listed in the learning outcomes" raise ValueError(msg)