[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop Python 3.6 compatibility objects/modules #24048

Merged
merged 1 commit into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions airflow/compat/asyncio.py

This file was deleted.

5 changes: 2 additions & 3 deletions airflow/jobs/triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

from sqlalchemy import func

from airflow.compat.asyncio import create_task
from airflow.configuration import conf
from airflow.jobs.base_job import BaseJob
from airflow.models.trigger import Trigger
Expand Down Expand Up @@ -236,7 +235,7 @@ async def arun(self):
The loop in here runs trigger addition/deletion/cleanup. Actual
triggers run in their own separate coroutines.
"""
watchdog = create_task(self.block_watchdog())
watchdog = asyncio.create_task(self.block_watchdog())
last_status = time.time()
while not self.stop:
# Run core logic
Expand All @@ -263,7 +262,7 @@ async def create_triggers(self):
trigger_id, trigger_instance = self.to_create.popleft()
if trigger_id not in self.triggers:
self.triggers[trigger_id] = {
"task": create_task(self.run_trigger(trigger_id, trigger_instance)),
"task": asyncio.create_task(self.run_trigger(trigger_id, trigger_instance)),
"name": f"{trigger_instance!r} (ID {trigger_id})",
"events": 0,
}
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
from airflow.timetables.simple import NullTimetable, OnceTimetable
from airflow.typing_compat import Literal, RePatternType
from airflow.typing_compat import Literal
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.dates import cron_presets, date_range as utils_date_range
Expand Down Expand Up @@ -1957,7 +1957,7 @@ def sub_dag(self, *args, **kwargs):

def partial_subset(
self,
task_ids_or_regex: Union[str, RePatternType, Iterable[str]],
task_ids_or_regex: Union[str, re.Pattern, Iterable[str]],
include_downstream=False,
include_upstream=True,
include_direct_upstream=False,
Expand Down Expand Up @@ -1985,7 +1985,7 @@ def partial_subset(
memo = {id(self.task_dict): None, id(self._task_group): None}
dag = copy.deepcopy(self, memo) # type: ignore

if isinstance(task_ids_or_regex, (str, RePatternType)):
if isinstance(task_ids_or_regex, (str, re.Pattern)):
matched_tasks = [t for t in self.tasks if re.findall(task_ids_or_regex, t.task_id)]
else:
matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex]
Expand Down
9 changes: 0 additions & 9 deletions airflow/typing_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,3 @@
from typing import Literal, Protocol, TypedDict, runtime_checkable # type: ignore
except ImportError:
from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable # type: ignore # noqa


# Before Py 3.7, there is no re.Pattern class
try:
from re import Pattern as RePatternType # type: ignore
except ImportError:
import re

RePatternType = type(re.compile('', 0)) # type: ignore
4 changes: 1 addition & 3 deletions airflow/utils/log/secrets_masker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from airflow.compat.functools import cache, cached_property

if TYPE_CHECKING:
from airflow.typing_compat import RePatternType

RedactableItem = Union[str, Dict[Any, Any], Tuple[Any, ...], List[Any]]


Expand Down Expand Up @@ -115,7 +113,7 @@ def _secrets_masker() -> "SecretsMasker":
class SecretsMasker(logging.Filter):
"""Redact secrets from logs"""

replacer: Optional["RePatternType"] = None
replacer: Optional[re.Pattern] = None
patterns: Set[str]

ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
Expand Down
5 changes: 2 additions & 3 deletions tests/triggers/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import pendulum
import pytest

from airflow.compat.asyncio import create_task
from airflow.triggers.base import TriggerEvent
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
Expand Down Expand Up @@ -72,7 +71,7 @@ async def test_datetime_trigger_timing():

# Create a task that runs the trigger for a short time then cancels it
trigger = DateTimeTrigger(future_moment)
trigger_task = create_task(trigger.run().__anext__())
trigger_task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

# It should not have produced a result
Expand All @@ -81,7 +80,7 @@ async def test_datetime_trigger_timing():

# Now, make one waiting for en event in the past and do it again
trigger = DateTimeTrigger(past_moment)
trigger_task = create_task(trigger.run().__anext__())
trigger_task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert trigger_task.done() is True
Expand Down