Source code for pretalx.submission.domain.queries.question

# SPDX-FileCopyrightText: 2025-present Tobias Kunze
# SPDX-License-Identifier: AGPL-3.0-only WITH LicenseRef-Pretalx-AGPL-3.0-Terms

from django.db.models import Count, Exists, OuterRef, Q

from pretalx.orga.rules import can_view_speaker_names
from pretalx.person.models import SpeakerProfile
from pretalx.person.rules import is_reviewer
from pretalx.submission.enums import QuestionTarget, QuestionVariant
from pretalx.submission.models import Answer, Submission


def active_questions(
    event,
    *,
    target=QuestionTarget.SUBMISSION,
    track=None,
    submission_type=None,
    for_reviewers=False,
    skip_limited=False,
):
    """Active questions in a given answering scope.

    ``target=None`` includes every non-reviewer target.
    ``skip_limited=True`` drops any question restricted to specific tracks or
    submission types (and ignores ``track`` / ``submission_type``).
    """
    queryset = event.questions(manager="all_objects").filter(active=True)
    if target:
        queryset = queryset.filter(target=target)
    else:
        queryset = queryset.exclude(target=QuestionTarget.REVIEWER)
    if skip_limited:
        queryset = queryset.filter(tracks__isnull=True, submission_types__isnull=True)
    else:
        if track:
            queryset = queryset.filter(Q(tracks__in=[track]) | Q(tracks__isnull=True))
        if submission_type:
            queryset = queryset.filter(
                Q(submission_types__in=[submission_type])
                | Q(submission_types__isnull=True)
            )
    if for_reviewers:
        queryset = queryset.filter(is_visible_to_reviewers=True)
    return (
        queryset.select_related("event")
        .prefetch_related("options")
        .order_by("-target", "position", "id")
    )


def questions_for_user(event, user):
    """Questions the user can see, with team-access filtering.

    A reviewer without submission edit perms is treated as a reviewer here
    regardless of any other perms (e.g. event settings) they may also hold,
    so they never see questions hidden from reviewers in review/listing views.
    """
    restricted_reviewer = (
        not user.is_anonymous
        and not user.has_perm("submission.orga_update_submission", event)
        and is_reviewer(user, event)
    )
    if restricted_reviewer and can_view_speaker_names(user, event):
        queryset = event.questions(manager="all_objects").filter(
            Q(is_visible_to_reviewers=True) | Q(target=QuestionTarget.REVIEWER),
            active=True,
        )
    elif restricted_reviewer:
        # Anonymised review phase: reviewers see only public questions
        queryset = event.questions.all().filter(is_public=True)
    elif user.has_perm("submission.update_question", event):
        # Organisers with edit permissions can see everything
        queryset = event.questions(manager="all_objects").all()
    elif user.has_perm("submission.orga_list_question", event):
        # Other team members can either view all active questions
        # or only questions open to reviewers
        queryset = event.questions(manager="all_objects").all()
    elif user.has_perm("submission.list_question", event):
        # Anonymous and low-perm users see public questions only
        queryset = event.questions.all().filter(is_public=True)
    else:
        return event.questions.none()

    if user.is_anonymous:
        team_filter = Q(limit_teams__isnull=True)
    else:
        team_filter = Q(limit_teams__isnull=True) | Q(limit_teams__in=user.teams.all())
    return queryset.filter(team_filter).select_related("event", "event__cfp").distinct()


def filter_submissions_by_question(
    qs, *, question=None, answer=None, option=None, unanswered=False
):
    """Filter a submission queryset by their answers to a question.

    ``option`` and ``answer`` privilege a positive match (returning
    submissions whose answer matches). ``unanswered=True`` is honoured
    only when neither is set, returning submissions with no answer to
    the question. Without a ``question``, the queryset is returned
    unchanged.
    """
    if not question:
        return qs
    answers = Answer.objects.filter(submission_id=OuterRef("pk"), question_id=question)
    if option:
        return qs.filter(Exists(answers.filter(options=option)))
    if answer:
        return qs.filter(Exists(answers.filter(answer__exact=answer)))
    if unanswered:
        return qs.filter(~Exists(answers))
    return qs


def missing_questions_for_speaker(*, speaker, submissions, questions):
    """Questions ``speaker`` hasn't fully answered for ``submissions``."""
    speaker_submissions = list(submissions.filter(speakers=speaker))
    submission_questions = [
        q for q in questions if q.target == QuestionTarget.SUBMISSION
    ]
    speaker_questions = [q for q in questions if q.target == QuestionTarget.SPEAKER]

    missing = []
    if submission_questions and speaker_submissions:
        answers_by_pair = {
            (a.question_id, a.submission_id): a
            for a in Answer.objects.filter(
                question__in=submission_questions, submission__in=speaker_submissions
            )
            .select_related("question")
            .prefetch_related("options")
        }
        for question in submission_questions:
            for submission in speaker_submissions:
                answer = answers_by_pair.get((question.pk, submission.pk))
                if not answer or not answer.is_answered:
                    missing.append(question)

    if speaker_questions:
        answers_by_question = {
            a.question_id: a
            for a in Answer.objects.filter(
                question__in=speaker_questions, speaker=speaker
            )
            .select_related("question")
            .prefetch_related("options")
        }
        for question in speaker_questions:
            answer = answers_by_question.get(question.pk)
            if not answer or not answer.is_answered:
                missing.append(question)
    return missing


[docs] def count_missing_answers(question, *, filter_speakers=None, filter_talks=None): """How many answers are missing for ``question``. Only meaningful for submission and speaker questions; reviewer questions return ``0``. Pass ``filter_speakers`` or ``filter_talks`` (querysets) to restrict the scope. """ answers = question.answers.all() filter_talks = filter_talks or Submission.objects.none() filter_speakers = filter_speakers or SpeakerProfile.objects.none() if filter_speakers or filter_talks: answers = answers.filter( Q(speaker__in=filter_speakers) | Q(submission__in=filter_talks) ) answer_count = answers.count() if question.target == QuestionTarget.SUBMISSION: submissions = filter_talks or question.event.submissions.all() return max(submissions.count() - answer_count, 0) if question.target == QuestionTarget.SPEAKER: speakers = filter_speakers or question.event.submitters return max(speakers.count() - answer_count, 0) return 0
def question_answer_summary(*, question, talks, speakers): """Counts and grouped answers for ``question`` over a filtered scope.""" answers = ( question.answers.filter(Q(speaker__in=speakers) | Q(submission__in=talks)) .order_by("pk") .distinct() ) if question.variant in (QuestionVariant.CHOICES, QuestionVariant.MULTIPLE): grouped_answers = ( answers.order_by("options") .values("options", "options__answer") .annotate(count=Count("id")) .order_by("-count") ) elif question.variant == QuestionVariant.FILE: grouped_answers = [{"answer": answer, "count": 1} for answer in answers] else: grouped_answers = ( answers.order_by("answer") .values("answer") .annotate(count=Count("id")) .order_by("-count") ) return { "answer_count": answers.count(), "missing_answers": count_missing_answers( question, filter_speakers=speakers, filter_talks=talks ), "grouped_answers": grouped_answers, } def answers_for_user(event, user): """Answers to questions the user can see, with related fields prefetched. Sites that already have a constrained Answer queryset (e.g. a submission's own answers) should filter on ``question__in=questions_for_user(...)`` directly instead. """ return ( Answer.objects.filter(question__in=questions_for_user(event, user)) .select_related("question", "question__event", "submission", "speaker") .prefetch_related("options") ) def answers_for_speaker(speaker): """All answers given by a speaker on their event. Includes both speaker-target answers (given for the speaker themselves) and submission-target answers on submissions they speak at, ordered by question position for stable rendering. """ return Answer.objects.filter( Q(submission__in=speaker.submissions.all()) | Q(speaker=speaker) ).order_by("question__position") def public_answers_for_speaker(speaker): """Public-facing speaker-target answers for ``speaker``'s profile page. Mirrors :func:`public_answers_for_submission`: only public, speaker-target answers on the speaker's event, ordered by question position. """ return ( speaker.answers.filter( question__is_public=True, question__event=speaker.event, question__target=QuestionTarget.SPEAKER, ) .select_related("question") .order_by("question__position") ) def public_answers_for_submission(submission): """Public-facing submission answers, filtered to the submission's track and submission type. Honours per-question track/submission-type restrictions: questions limited to other tracks or types are dropped. Used by the public talk page. """ qs = submission.answers.filter( Q(question__submission_types__in=[submission.submission_type]) | Q(question__submission_types__isnull=True), question__is_public=True, question__event=submission.event, question__target=QuestionTarget.SUBMISSION, ) if submission.track: qs = qs.filter( Q(question__tracks__in=[submission.track]) | Q(question__tracks__isnull=True) ) return qs.select_related("question").order_by("question__position")