[go: nahoru, domu]

Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Issue 22 | Add workaround for table using table from another DAG #23

Merged
merged 4 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions dashboard/model/tables_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import csv
from concurrent.futures import ThreadPoolExecutor
from typing import Dict

from dashboard.models import Period, Table
from dashboard.utils import clean_dag_id, handle_resource, simple_state
Expand Down Expand Up @@ -43,7 +44,6 @@ class DagTableProgress:

class TableDataProvider:


@staticmethod
def _format_table_id(table):
id = table['db'] + '.' + table['name']
Expand Down Expand Up @@ -75,8 +75,8 @@ def get_tables(self):
def history(self, table):
return self.__get_detailed_view_data(self.tables[table], DETAILED_CHART_DAYS_NUM)

def get_tables_by_dag(self, dag_name):
return [table for table in self.tables.values() if table.dag_id == dag_name]
def get_tables_by_dag(self, dag_name) -> Dict[str, Table]:
return {id: table for id, table in self.tables.items() if table.dag_id == dag_name}

def get_tables_graph(self, dag_id, execution_date):
name_without_version = clean_dag_id(dag_id)
Expand All @@ -89,7 +89,7 @@ def get_tables_graph(self, dag_id, execution_date):
state=self.airflow.get_dag_state(dag_id, execution_date))

# tables
for table in dag_tables:
for table in dag_tables.values():
yield GraphVertex(
id=table.id,
name=table.name + (' ({})'.format(table.period.name) if table.period else ''),
Expand All @@ -98,7 +98,9 @@ def get_tables_graph(self, dag_id, execution_date):
dag_progress[table.task_id].end_date,
dag_progress[table.task_id].duration
),
parent=table.get_parent()
# workaround for this entire method not being able to reference table managed by other DAG in table.uses
# see https://github.com/Wikia/discreETLy/issues/22
parent='main' if table.uses is None or table.uses not in dag_tables.keys() else table.uses
)

@handle_resource('influx')
Expand Down
2 changes: 0 additions & 2 deletions dashboard/plugins/tables/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ Additionally you may want to provide cadence for how often the table is updated
Update `id` incrementally starting from 1 for each table redeclared with another cadence.
This makes specifying dependency on a particular cadence possible, e.g.: `uses: dbname.frequently_updated.daily`.

If several DAGs use the same table, it needs to be redeclared with each `dag_id` for the table to be supported by
`Tables managed by DAG` view.

See [example file](tables.yaml.template) for more details on the data structure.

Expand Down
7 changes: 0 additions & 7 deletions dashboard/plugins/tables/tables.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,3 @@
dag_id: my_dag
task_id: rollup_table_insert
uses: dbname.frequently_updated_table.daily

# redeclaring popular_raw_table for use with another DAG
- name: popular_raw_table
db: dbname
dag_id: another_dag
task_id: popular_raw_table_sensor