Deriving Data Lineage from Python Code Using AST and Pyflakes
This article explains how to automatically extract data lineage and code dependencies from large collections of Python scripts by leveraging the language's compilation stages, abstract syntax trees, and the Pyflakes static‑analysis library, providing practical code examples and custom parsers for SQL extraction.
Developers and data engineers often struggle with hundreds of tables and slow, risky SQL scripts, fearing that a single code change could cause production incidents. Data lineage—tracking how data is created, transformed, and consumed—offers a systematic way to understand these complex relationships.
The article introduces a method for deriving data lineage directly from Python code. It first reviews the CPython compilation pipeline: source code → parser tree → abstract syntax tree (AST) → control‑flow graph → bytecode execution.
Static analysis tools in IDEs perform the first three steps to detect syntax errors. By reusing this principle, one can parse Python scripts to automatically extract valuable metadata such as external function calls, referenced SQL statements, accessed tables and fields, and transformation logic.
Using the Pyflakes library as a foundation, the article shows how to invoke its API programmatically instead of via the command line. The following snippet demonstrates the basic setup:
from pyflakes import reporter as modReporter
from pyflakes import api
if __name__ == "__main__":
reporter = modReporter._makeDefaultReporter()
args = ['C:\\Users\\yzeng\\PycharmProjects\\pythonProject\\flakes']
warnings = api.checkRecursive(args, reporter)Pyflakes parses each file into an AST, tokenizes the source, and walks the tree to identify imports, unused variables, and other issues. The article then extends this approach with a custom linkage_Checker class that traverses the AST, prints node types, depths, and positional information, and extracts code fragments based on line/column ranges.
# Borrowing ideas from Pyflakes
class linkage_Checker:
nodeDepth = 0
def __init__(self, tree, file_tokens=(), filename='(none)', codestr='none'):
self._nodeHandlers = {}
self.codelines = codestr.decode().split('
')
self.handleChildren(tree)
# Traverse the AST
def handleChildren(self, tree, omit=None):
for node in checker.iter_child_nodes(tree, omit=omit):
self.handleNode(node, tree)
# Process each node
def handleNode(self, node, parent):
if node is None:
return
self.nodeDepth += 1
print('-----------------')
print('节点类型:%s' % node.__class__)
print('节点层次:%s' % self.nodeDepth)
try:
fields = '/'.join([field for field in node.__class__._fields])
print('节点属性:%s' % fields)
except:
print(123)
lineno = getattr(node, 'lineno', 0)
end_lineno = getattr(node, 'end_lineno', 0)
col_offset = getattr(node, 'col_offset', 0)
end_col_offset = getattr(node, 'end_col_offset', 0)
print('起始行:%s' % lineno)
print('结束行:%s' % end_lineno)
print('起始列:%s' % col_offset)
print('结束列:%s' % end_col_offset)
if lineno > 0:
getCodebyposition(self.codelines, lineno, end_lineno, col_offset, end_col_offset)
try:
handler = self.getNodeHandler(node.__class__)
handler(node)
finally:
self.nodeDepth -= 1
def getNodeHandler(self, node_class):
try:
return self._nodeHandlers[node_class]
except KeyError:
nodeType = checker.getNodeType(node_class)
self._nodeHandlers[node_class] = handler = getattr(self, nodeType, self._unknown_handler)
return handler
def _unknown_handler(self, node):
self.handleChildren(node)
def check(codestr, filename, reporter=None):
try:
tree = ast.parse(codestr, filename=filename)
except SyntaxError:
value = sys.exc_info()[1]
msg = value.args[0]
(lineno, offset, text) = value.lineno, value.offset, value.text
print(lineno, offset, text)
file_tokens = checker.make_tokens(codestr)
w = linkage_Checker(tree, file_tokens=file_tokens, filename=filename, codestr=codestr)
return 1
def getCodebyposition(codelines, lineno, end_lineno, col_offset, end_col_offset):
for i in range(lineno, end_lineno+1):
if i == lineno and lineno == end_lineno:
print(codelines[lineno-1][col_offset:end_col_offset])
elif i == lineno:
print(codelines[lineno-1][col_offset:])
elif i == end_lineno:
print(codelines[end_lineno-1][:end_col_offset])
else:
print(codelines[i-1])
return 1With the AST understood, the article shows how to locate specific patterns such as SQL queries embedded in pandas calls. Two helper functions parse the SELECT‑FROM clause to retrieve table names and column lists:
# Extract tables and fields from a SQL statement
def getTableField(statement):
result = {}
matchObj = re.search(r'select(.*)from(.*)', statement, re.M|re.I)
if pd.notnull(matchObj):
fields = re.split(',', matchObj.group(1))
fields = [field.strip() for field in fields]
table = matchObj.group(2).strip()
result[table] = fields
return result
# Simple SQL parser used by the checker
def sqlparse(sql_str):
sql_str = sql_str.replace('SELECT', 'select')
sql_str = sql_str.replace('WHERE', 'where')
sql_str = sql_str.replace('FROM', 'from')
re_skip_detail = re.compile("([a-zA-Z0-9]+)")
tmp = re_skip_detail.split(sql_str)
select_index = from_index = 0
parse_result = []
for index, item in enumerate(tmp):
if item in ('select', 'where'):
if from_index > 0:
statement = ''.join(tmp[select_index:index])
if len(statement) > 0:
table_fields = getTableField(statement)
parse_result.append(table_fields)
from_index = 0
if item == 'select':
select_index = index
elif item == 'from':
from_index = index
return parse_resultRunning the custom checker on a codebase produces a hierarchical view of the AST, reveals which tables and columns each script touches, and highlights unused imports or variables. The final visual results (shown as images in the original article) demonstrate how the approach can dramatically reduce manual effort in understanding data and code dependencies.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
