[go: nahoru, domu]

Skip to content

Commit

Permalink
Drop Python 3.6 compatibility objects/modules (#24048)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1dccaad)
  • Loading branch information
Taragolis authored and ephraimbuddy committed Jul 5, 2022
1 parent a2968ff commit 9c5a07d
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 49 deletions.
28 changes: 0 additions & 28 deletions airflow/compat/asyncio.py

This file was deleted.

5 changes: 2 additions & 3 deletions airflow/jobs/triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

from sqlalchemy import func

from airflow.compat.asyncio import create_task
from airflow.configuration import conf
from airflow.jobs.base_job import BaseJob
from airflow.models.trigger import Trigger
Expand Down Expand Up @@ -236,7 +235,7 @@ async def arun(self):
The loop in here runs trigger addition/deletion/cleanup. Actual
triggers run in their own separate coroutines.
"""
watchdog = create_task(self.block_watchdog())
watchdog = asyncio.create_task(self.block_watchdog())
last_status = time.time()
while not self.stop:
# Run core logic
Expand All @@ -263,7 +262,7 @@ async def create_triggers(self):
trigger_id, trigger_instance = self.to_create.popleft()
if trigger_id not in self.triggers:
self.triggers[trigger_id] = {
"task": create_task(self.run_trigger(trigger_id, trigger_instance)),
"task": asyncio.create_task(self.run_trigger(trigger_id, trigger_instance)),
"name": f"{trigger_instance!r} (ID {trigger_id})",
"events": 0,
}
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
from airflow.timetables.simple import NullTimetable, OnceTimetable
from airflow.typing_compat import Literal, RePatternType
from airflow.typing_compat import Literal
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.dates import cron_presets, date_range as utils_date_range
Expand Down Expand Up @@ -1998,7 +1998,7 @@ def sub_dag(self, *args, **kwargs):

def partial_subset(
self,
task_ids_or_regex: Union[str, RePatternType, Iterable[str]],
task_ids_or_regex: Union[str, re.Pattern, Iterable[str]],
include_downstream=False,
include_upstream=True,
include_direct_upstream=False,
Expand Down Expand Up @@ -2026,7 +2026,7 @@ def partial_subset(
memo = {id(self.task_dict): None, id(self._task_group): None}
dag = copy.deepcopy(self, memo) # type: ignore

if isinstance(task_ids_or_regex, (str, RePatternType)):
if isinstance(task_ids_or_regex, (str, re.Pattern)):
matched_tasks = [t for t in self.tasks if re.findall(task_ids_or_regex, t.task_id)]
else:
matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex]
Expand Down
9 changes: 0 additions & 9 deletions airflow/typing_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,3 @@
from typing import Literal, Protocol, TypedDict, runtime_checkable # type: ignore
except ImportError:
from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable # type: ignore # noqa


# Before Py 3.7, there is no re.Pattern class
try:
from re import Pattern as RePatternType # type: ignore
except ImportError:
import re

RePatternType = type(re.compile('', 0)) # type: ignore
4 changes: 1 addition & 3 deletions airflow/utils/log/secrets_masker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from airflow.compat.functools import cache, cached_property

if TYPE_CHECKING:
from airflow.typing_compat import RePatternType

RedactableItem = Union[str, Dict[Any, Any], Tuple[Any, ...], List[Any]]


Expand Down Expand Up @@ -115,7 +113,7 @@ def _secrets_masker() -> "SecretsMasker":
class SecretsMasker(logging.Filter):
"""Redact secrets from logs"""

replacer: Optional["RePatternType"] = None
replacer: Optional[re.Pattern] = None
patterns: Set[str]

ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
Expand Down
5 changes: 2 additions & 3 deletions tests/triggers/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import pendulum
import pytest

from airflow.compat.asyncio import create_task
from airflow.triggers.base import TriggerEvent
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
Expand Down Expand Up @@ -72,7 +71,7 @@ async def test_datetime_trigger_timing():

# Create a task that runs the trigger for a short time then cancels it
trigger = DateTimeTrigger(future_moment)
trigger_task = create_task(trigger.run().__anext__())
trigger_task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

# It should not have produced a result
Expand All @@ -81,7 +80,7 @@ async def test_datetime_trigger_timing():

# Now, make one waiting for en event in the past and do it again
trigger = DateTimeTrigger(past_moment)
trigger_task = create_task(trigger.run().__anext__())
trigger_task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert trigger_task.done() is True
Expand Down

0 comments on commit 9c5a07d

Please sign in to comment.