[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL deadlock when using DAG serialization #8521

Closed
michalmisiewicz opened this issue Apr 22, 2020 · 11 comments
Closed

MySQL deadlock when using DAG serialization #8521

michalmisiewicz opened this issue Apr 22, 2020 · 11 comments
Labels
kind:bug This is a clearly a bug

Comments

@michalmisiewicz
Copy link
Contributor
michalmisiewicz commented Apr 22, 2020

Apache Airflow version: 1.10.10
Kubernetes version: v1.16.8
MySQL version: 5.7

What happened:
Airflow tasks fail with Deadlock when running Dag with max_active_runs > 1 and concurrency > 1 and when dag_serialization is enabled.

Logs

[2020-04-22 19:19:49,018] {taskinstance.py:1145} ERROR - (_mysql_exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: INSERT INTO rendered_task_instance_fields (dag_id, task_id, execution_date, rendered_fields) VALUES (%s, %s, %s, %s)]
[parameters: ('some_dag_v.0.0.1', 'some_task_id', datetime.datetime(2019, 12, 2, 0, 0), 'Some rendered fields (837 characters truncated)')]

(Background on this error at: http://sqlalche.me/e/e3q8)
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 590, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1002, in _run_raw_task
self.refresh_from_db(lock_for_update=True)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/contextlib.py", line 119, in exit
next(self.gen)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 45, in create_session
session.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1036, in commit
self.transaction.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 503, in commit
self._prepare_impl()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2496, in flush
self._flush(objects)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2637, in _flush
transaction.rollback(capture_exception=True)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 69, in exit
exc_value, with_traceback=exc_tb,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise

raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
flush_context.execute()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 984, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1103, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1288, in execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1482, in handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from
=e
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise

raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 590, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)

@michalmisiewicz michalmisiewicz added the kind:bug This is a clearly a bug label Apr 22, 2020
@boring-cyborg
Copy link
boring-cyborg bot commented Apr 22, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@zorseti
Copy link
zorseti commented Jun 14, 2020

I encountered the same problem. Is this problem solved now?
The following two SQL statements will report an error:
ERROR - (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
SQL: INSERT INTO rendered_task_instance_fields (dag_id, task_id, execution_date, rendered_fields) VALUES (%s, %s, %s, %s)
SQL: DELETE FROM rendered_task_instance_fields WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s AND (rendered_task_instance_fields.dag_id, rendered_task_instance_fields.task_id, rendered_task_instance_fields.execution_date) NOT IN (SELECT subq1.dag_id, subq1.task_id, subq1.execution_date FROM (SELECT rendered_task_instance_fields.dag_id AS dag_id, rendered_task_instance_fields.task_id AS task_id, rendered_task_instance_fields.execution_date AS execution_date FROM rendered_task_instance_fields WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s ORDER BY rendered_task_instance_fields.execution_date DESC LIMIT %s) AS subq1)

@ghostbody
Copy link

Same problem +1

@ft2898
Copy link
ft2898 commented Jul 27, 2020

Same problem +1
Apache Airflow version: 1.10.11

@ozw1z5rd
Copy link

Hi, I have the same issue.

I was looking in models/renderedtifields.py file and I noticed that

def delete_old_records(

contains a line that loads the number or rendered fields to keep:

num_to_keep=conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0)

and if this value is <= 0 the function will return doing nothing.

 if num_to_keep <= 0:
     return

Since the dead lock is about the insert and the delete in that table, setting max_num_rendered_ti_fields_per_task = 0 inside the [core] config ... perhaps can fix the issue.

Of course it does not work.

Using SHOW ENGINE INNODB STATUS I see queries like this:

DELETE FROM rendered_task_instance_fields 
WHERE rendered_task_instance_fields.dag_id = 'PARTITIONADD' 
AND rendered_task_instance_fields.task_id = 'partition_add' 
AND (rendered_task_instance_fields.dag_id, rendered_task_instance_fields.task_id, rendered_task_instance_fields.execution_date) NOT IN (
       SELECT subq1.dag_id, subq1.task_id, subq1.execution_date
      FROM (
             SELECT rendered_task_instance_fields.dag_id AS dag_id, rendered_task_instance_fields.task_id AS task_id,         
                           rendered_task_instance_fields.execution_date AS execution_date
              FROM rendered_task_instance_fields
             WHERE rendered_task_instance_fields.dag_id = 'PARTITIONADD' 
             AND rendered_task_instance_fields.task_id = 'partition_add' 
            ORDER BY rendered_task_instance_fields.execution_date DESC
            LIMIT 30
       ) 
AS subq1
)

-----> Please note LIMIT 30

I found this code inside models/taskinstance.py

 if STORE_SERIALIZED_DAGS:
     RTIF.write(RTIF(ti=self, render_templates=False), session=session)
     RTIF.delete_old_records(self.task_id, self.dag_id, session=session)

and it's the unique place where delete_old_records is called, so ... it is weird, is it not?
from which point of the universe comes that "30"?

I'll investigate better tomorrow...

@ozw1z5rd
Copy link
ozw1z5rd commented Jul 31, 2020

max_num_rendered_ti_fields_per_task = 0
seems that fixed my problems. Of course can only be a temporary fix.
I moved the table cleaning to external task.

@honarkhah
Copy link
Contributor
honarkhah commented Aug 17, 2020

In my case issue is about concurrent dags, the insertion issue is fixed in airflow 1.10.11 fixed PR

Steps to reproduce:

  • Trigger more than 1 instance of a dag
  • Have more than 1 worker running at the same time

If X tasks of a dag ran at the same time, (X - 1) of them will face lock and just 1 of them return success!

[2020-08-17 10:42:30,177] {taskinstance.py:882} INFO - 
--------------------------------------------------------------------------------
[2020-08-17 10:42:30,187] {taskinstance.py:901} INFO - Executing <Task(BranchPythonOperator): branch_task> on 2020-08-17T10:34:24.115521+00:00
[2020-08-17 10:42:30,190] {standard_task_runner.py:54} INFO - Started process 553 to run task
[2020-08-17 10:42:30,212] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'logo_detection_webhook', 'branch_task', '2020-08-17T10:34:24.115521+00:00', '--job_id', '169070', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/model_training/logo_detection_webhook.py', '--cfg_path', '/tmp/tmprvq51le4']
[2020-08-17 10:42:30,213] {standard_task_runner.py:78} INFO - Job 169070: Subtask branch_task
[2020-08-17 10:42:30,251] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: logo_detection_webhook.branch_task 2020-08-17T10:34:24.115521+00:00 [running]> 91a09a3997d3
[2020-08-17 10:43:21,389] {taskinstance.py:1150} ERROR - (_mysql_exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: DELETE FROM rendered_task_instance_fields WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s AND (rendered_task_instance_fields.dag_id, rendered_task_instance_fields.task_id, rendered_task_instance_fields.execution_date) NOT IN (SELECT subq1.dag_id, subq1.task_id, subq1.execution_date 
FROM (SELECT rendered_task_instance_fields.dag_id AS dag_id, rendered_task_instance_fields.task_id AS task_id, rendered_task_instance_fields.execution_date AS execution_date 
FROM rendered_task_instance_fields 
WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s ORDER BY rendered_task_instance_fields.execution_date DESC 
 LIMIT %s) AS subq1)]
[parameters: ('logo_detection_webhook', 'branch_task', 'logo_detection_webhook', 'branch_task', 30)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')

@rtjarvis
Copy link
rtjarvis commented Sep 4, 2020

Is this issue still present in 1.10.12?

@zorseti
Copy link
zorseti commented Sep 10, 2020

Is this issue still present in 1.10.12?
Have fixed in 1.10.2.
https://github.com/apache/airflow/pull/9993/files
But I am not sure this is work,I fix it by change subquery in delete

@potiuk potiuk added this to the Airflow 1.10.12 milestone Sep 10, 2020
@potiuk
Copy link
Member
potiuk commented Sep 10, 2020

I will mark it as fixed then @zorseti. I think we can always re-open in case we still see it happening :)

@guptaneha0908
Copy link

@honarkhah May I know how did you fix the concurrent task under subdagoperator issue. I am still facing it. I have raised a similar question here : https://stackoverflow.com/questions/70263466/airflow-1-10-14-subdagoperator-failing-with-deadlock-issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

9 participants