Fundamentals 12 min read

Deriving Data Lineage from Python Code Using AST and Pyflakes

This article explains how to automatically extract data lineage and code dependencies from large collections of Python scripts by leveraging the language's compilation stages, abstract syntax trees, and the Pyflakes static‑analysis library, providing practical code examples and custom parsers for SQL extraction.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Deriving Data Lineage from Python Code Using AST and Pyflakes

Developers and data engineers often struggle with hundreds of tables and slow, risky SQL scripts, fearing that a single code change could cause production incidents. Data lineage—tracking how data is created, transformed, and consumed—offers a systematic way to understand these complex relationships.

The article introduces a method for deriving data lineage directly from Python code. It first reviews the CPython compilation pipeline: source code → parser tree → abstract syntax tree (AST) → control‑flow graph → bytecode execution.

Static analysis tools in IDEs perform the first three steps to detect syntax errors. By reusing this principle, one can parse Python scripts to automatically extract valuable metadata such as external function calls, referenced SQL statements, accessed tables and fields, and transformation logic.

Using the Pyflakes library as a foundation, the article shows how to invoke its API programmatically instead of via the command line. The following snippet demonstrates the basic setup:

from pyflakes import reporter as modReporter
from pyflakes import api

if __name__ == "__main__":
    reporter = modReporter._makeDefaultReporter()
    args = ['C:\\Users\\yzeng\\PycharmProjects\\pythonProject\\flakes']
    warnings = api.checkRecursive(args, reporter)

Pyflakes parses each file into an AST, tokenizes the source, and walks the tree to identify imports, unused variables, and other issues. The article then extends this approach with a custom linkage_Checker class that traverses the AST, prints node types, depths, and positional information, and extracts code fragments based on line/column ranges.

# Borrowing ideas from Pyflakes
class linkage_Checker:
    nodeDepth = 0
    def __init__(self, tree, file_tokens=(), filename='(none)', codestr='none'):
        self._nodeHandlers = {}
        self.codelines = codestr.decode().split('
')
        self.handleChildren(tree)
    # Traverse the AST
    def handleChildren(self, tree, omit=None):
        for node in checker.iter_child_nodes(tree, omit=omit):
            self.handleNode(node, tree)
    # Process each node
    def handleNode(self, node, parent):
        if node is None:
            return
        self.nodeDepth += 1
        print('-----------------')
        print('节点类型:%s' % node.__class__)
        print('节点层次:%s' % self.nodeDepth)
        try:
            fields = '/'.join([field for field in node.__class__._fields])
            print('节点属性:%s' % fields)
        except:
            print(123)
        lineno = getattr(node, 'lineno', 0)
        end_lineno = getattr(node, 'end_lineno', 0)
        col_offset = getattr(node, 'col_offset', 0)
        end_col_offset = getattr(node, 'end_col_offset', 0)
        print('起始行:%s' % lineno)
        print('结束行:%s' % end_lineno)
        print('起始列:%s' % col_offset)
        print('结束列:%s' % end_col_offset)
        if lineno > 0:
            getCodebyposition(self.codelines, lineno, end_lineno, col_offset, end_col_offset)
        try:
            handler = self.getNodeHandler(node.__class__)
            handler(node)
        finally:
            self.nodeDepth -= 1
    def getNodeHandler(self, node_class):
        try:
            return self._nodeHandlers[node_class]
        except KeyError:
            nodeType = checker.getNodeType(node_class)
        self._nodeHandlers[node_class] = handler = getattr(self, nodeType, self._unknown_handler)
        return handler
    def _unknown_handler(self, node):
        self.handleChildren(node)

def check(codestr, filename, reporter=None):
    try:
        tree = ast.parse(codestr, filename=filename)
    except SyntaxError:
        value = sys.exc_info()[1]
        msg = value.args[0]
        (lineno, offset, text) = value.lineno, value.offset, value.text
        print(lineno, offset, text)
    file_tokens = checker.make_tokens(codestr)
    w = linkage_Checker(tree, file_tokens=file_tokens, filename=filename, codestr=codestr)
    return 1

def getCodebyposition(codelines, lineno, end_lineno, col_offset, end_col_offset):
    for i in range(lineno, end_lineno+1):
        if i == lineno and lineno == end_lineno:
            print(codelines[lineno-1][col_offset:end_col_offset])
        elif i == lineno:
            print(codelines[lineno-1][col_offset:])
        elif i == end_lineno:
            print(codelines[end_lineno-1][:end_col_offset])
        else:
            print(codelines[i-1])
    return 1

With the AST understood, the article shows how to locate specific patterns such as SQL queries embedded in pandas calls. Two helper functions parse the SELECT‑FROM clause to retrieve table names and column lists:

# Extract tables and fields from a SQL statement
def getTableField(statement):
    result = {}
    matchObj = re.search(r'select(.*)from(.*)', statement, re.M|re.I)
    if pd.notnull(matchObj):
        fields = re.split(',', matchObj.group(1))
        fields = [field.strip() for field in fields]
        table = matchObj.group(2).strip()
        result[table] = fields
    return result

# Simple SQL parser used by the checker
def sqlparse(sql_str):
    sql_str = sql_str.replace('SELECT', 'select')
    sql_str = sql_str.replace('WHERE', 'where')
    sql_str = sql_str.replace('FROM', 'from')
    re_skip_detail = re.compile("([a-zA-Z0-9]+)")
    tmp = re_skip_detail.split(sql_str)
    select_index = from_index = 0
    parse_result = []
    for index, item in enumerate(tmp):
        if item in ('select', 'where'):
            if from_index > 0:
                statement = ''.join(tmp[select_index:index])
                if len(statement) > 0:
                    table_fields = getTableField(statement)
                    parse_result.append(table_fields)
                from_index = 0
            if item == 'select':
                select_index = index
        elif item == 'from':
            from_index = index
    return parse_result

Running the custom checker on a codebase produces a hierarchical view of the AST, reveals which tables and columns each script touches, and highlights unused imports or variables. The final visual results (shown as images in the original article) demonstrate how the approach can dramatically reduce manual effort in understanding data and code dependencies.

big dataASTdata lineagestatic analysisCode Parsingpyflakes
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.