Step-by-step guide to building automated knowledge graph pipelines using langgraph and NetworkX

In this tutorial, we demonstrate how to build an automated knowledge graph (KG) pipeline using Langgraph and NetworkX. The pipeline simulates a series of intelligent agents that collaborate on tasks such as data collection, entity extraction, relationship recognition, entity resolution, and graphical verification. Starting with user-provided topics such as “Artificial Intelligence”, the system methodically extracts relevant entities and relationships, resolves duplicates, and integrates information into a cohesive graphical structure. By visualizing the final knowledge graph, developers and data scientists can have a clear understanding of the complex interrelationships between concepts, which makes this approach very beneficial for the applications of semantic analysis, natural language processing, and knowledge management.
!pip install langgraph langchain_core
We installed two basic Python libraries: Langgraph, which is used to create and orchestrate agent-based computing workflows and Langchain Core, which provides the basic categories and utilities for building language model-driven applications. These libraries can seamlessly integrate agents into smart data pipelines.
import re
import networkx as nx
import matplotlib.pyplot as plt
from typing import TypedDict, List, Tuple, Dict, Any
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, END
We import the basic library to build an automated knowledge graph pipeline. It includes RE for regular expression-based text processing, NetworkX and Matplotlib, for creating and visualizing graphics, typing and typing comments for structured data processing, and Langgraph and Langchain_core for coordinated interactions between AI quantities in a workflow.
class KGState(TypedDict):
topic: str
raw_text: str
entities: List[str]
relations: List[Tuple[str, str, str]]
resolved_relations: List[Tuple[str, str, str]]
graph: Any
validation: Dict[str, Any]
messages: List[Any]
current_agent: str
We define the structured data type KGSTATE using Python’s TypedDict. It outlines patterns of managing states across different steps across knowledge graph pipelines. It includes details such as selected topics, collected text, determined entities and relationships, resolved duplicates, constructed graphical objects, validation results, interactive messages, and proxy tracking currently active.
def data_gatherer(state: KGState) -> KGState:
topic = state["topic"]
print(f"📚 Data Gatherer: Searching for information about '{topic}'")
collected_text = f"{topic} is an important concept. It relates to various entities like EntityA, EntityB, and EntityC. EntityA influences EntityB. EntityC is a type of EntityB."
state["messages"].append(AIMessage(content=f"Collected raw text about {topic}"))
state["raw_text"] = collected_text
state["current_agent"] = "entity_extractor"
return state
Data_gatherer, this function is the first step in the pipeline. It simulates the collection of original text data about providing the subject (stored in state)[“topic”]). It then stores these simulated data to state[“raw_text”]add a message indicating that the data collection is complete and update the pipeline state by setting the next agent (ENTITY_EXTRACTOR) to Active.
def entity_extractor(state: KGState) -> KGState:
print("🔍 Entity Extractor: Identifying entities in the text")
text = state["raw_text"]
entities = re.findall(r"Entity[A-Z]", text)
entities = [state["topic"]] + entities
state["entities"] = list(set(entities))
state["messages"].append(AIMessage(content=f"Extracted entities: {state['entities']}"))
print(f" Found entities: {state['entities']}")
state["current_agent"] = "relation_extractor"
return state
The ENTITY_EXTRACTOR function identifies entities from the collected original text using a simple regular expression pattern that matches terms such as “Entitya”, “EntityB”. It also includes the main topic as entities and ensures uniqueness by converting the list into a collection. The extracted entity is stored in the state, the AI message records the result, and the pipeline will proceed to the Relation_extractor proxy.
def relation_extractor(state: KGState) -> KGState:
print("🔗 Relation Extractor: Identifying relationships between entities")
text = state["raw_text"]
entities = state["entities"]
relations = []
relation_patterns = [
(r"([A-Za-z]+) relates to ([A-Za-z]+)", "relates_to"),
(r"([A-Za-z]+) influences ([A-Za-z]+)", "influences"),
(r"([A-Za-z]+) is a type of ([A-Za-z]+)", "is_type_of")
]
for e1 in entities:
for e2 in entities:
if e1 != e2:
for pattern, rel_type in relation_patterns:
if re.search(f"{e1}.*{rel_type}.*{e2}", text.replace("_", " "), re.IGNORECASE) or
re.search(f"{e1}.*{e2}", text, re.IGNORECASE):
relations.append((e1, rel_type, e2))
state["relations"] = relations
state["messages"].append(AIMessage(content=f"Extracted relations: {relations}"))
print(f" Found relations: {relations}")
state["current_agent"] = "entity_resolver"
return state
The Relation_Extractor function detects the semantic relationship between entities in the original text. It uses predefined regular expression patterns to identify phrases of “influence” or “yes” between entities. When a match is found, it adds the corresponding relationship to the relationship list as a triple (topic, predicate, object). These extracted relationships are stored in the state, a message is recorded for proxy communication, and control is transferred to the next proxy: entity_resolver.
def entity_resolver(state: KGState) -> KGState:
print("🔄 Entity Resolver: Resolving duplicate entities")
entity_map = {}
for entity in state["entities"]:
canonical_name = entity.lower().replace(" ", "_")
entity_map[entity] = canonical_name
resolved_relations = []
for s, p, o in state["relations"]:
s_resolved = entity_map.get(s, s)
o_resolved = entity_map.get(o, o)
resolved_relations.append((s_resolved, p, o_resolved))
state["resolved_relations"] = resolved_relations
state["messages"].append(AIMessage(content=f"Resolved relations: {resolved_relations}"))
state["current_agent"] = "graph_integrator"
return state
The Entity_Resolver function normalizes entity names to avoid duplication and inconsistencies. It creates a map by converting each entity to lowercase and replacing space with an underscore (Entity_map). This map is then applied to extract all topics and objects in the relationship to produce a resolution relationship. These normalized ternaries will be added to the state, record acknowledgement messages, and pass the control to the Graph_integrator proxy.
def graph_integrator(state: KGState) -> KGState:
print("📊 Graph Integrator: Building the knowledge graph")
G = nx.DiGraph()
for s, p, o in state["resolved_relations"]:
if not G.has_node(s):
G.add_node(s)
if not G.has_node(o):
G.add_node(o)
G.add_edge(s, o, relation=p)
state["graph"] = G
state["messages"].append(AIMessage(content=f"Built graph with {len(G.nodes)} nodes and {len(G.edges)} edges"))
state["current_agent"] = "graph_validator"
return state
The Graph_Integrator function uses NetworkX.digraph() to build a practical knowledge graph and supports directional relationships. It iterates over the distinguished triples (theme, predicate, object), ensuring that there are two nodes, then adding an edge with directed edges, with metadata as metadata. The resulting graph is saved in state, with a digest message attached, and the pipeline is transitioned to the Graph_validator proxy for final verification.
def graph_validator(state: KGState) -> KGState:
print("✅ Graph Validator: Validating knowledge graph")
G = state["graph"]
validation_report = {
"num_nodes": len(G.nodes),
"num_edges": len(G.edges),
"is_connected": nx.is_weakly_connected(G) if G.nodes else False,
"has_cycles": not nx.is_directed_acyclic_graph(G) if G.nodes else False
}
state["validation"] = validation_report
state["messages"].append(AIMessage(content=f"Validation report: {validation_report}"))
print(f" Validation report: {validation_report}")
state["current_agent"] = END
return state
The Graph_validator function performs basic health checks on the constructed knowledge graph. It compiles a verification report containing the number of nodes and edges, whether the graph is weakly connected (i.e., if the orientation is ignored, each node is accessible), and whether the graph contains periods. The report is added to the state and is recorded as an AI message. After verification is complete, the pipeline is marked as completed by setting Current_agent to end.
def router(state: KGState) -> str:
return state["current_agent"]
def visualize_graph(graph):
plt.figure(figsize=(10, 6))
pos = nx.spring_layout(graph)
nx.draw(graph, pos, with_labels=True, node_color="skyblue", node_size=1500, font_size=10)
edge_labels = nx.get_edge_attributes(graph, 'relation')
nx.draw_networkx_edge_labels(graph, pos, edge_labels=edge_labels)
plt.title("Knowledge Graph")
plt.tight_layout()
plt.show()
The router function directs the pipe to the next agent based on the Current_agent field of the pipe. Meanwhile, the Visualize_graph function uses Matplotlib and NetworkX to display the final knowledge graph, displaying the relationship between nodes, edges and markers for intuitive visual understanding.
def build_kg_graph():
workflow = StateGraph(KGState)
workflow.add_node("data_gatherer", data_gatherer)
workflow.add_node("entity_extractor", entity_extractor)
workflow.add_node("relation_extractor", relation_extractor)
workflow.add_node("entity_resolver", entity_resolver)
workflow.add_node("graph_integrator", graph_integrator)
workflow.add_node("graph_validator", graph_validator)
workflow.add_conditional_edges("data_gatherer", router,
{"entity_extractor": "entity_extractor"})
workflow.add_conditional_edges("entity_extractor", router,
{"relation_extractor": "relation_extractor"})
workflow.add_conditional_edges("relation_extractor", router,
{"entity_resolver": "entity_resolver"})
workflow.add_conditional_edges("entity_resolver", router,
{"graph_integrator": "graph_integrator"})
workflow.add_conditional_edges("graph_integrator", router,
{"graph_validator": "graph_validator"})
workflow.add_conditional_edges("graph_validator", router,
{END: END})
workflow.set_entry_point("data_gatherer")
return workflow.compile()
The build_kg_graph function uses langgraph to define a complete knowledge graph workflow. From data collection to graph validation, it adds each agent as a node in sequence and connects them through a conditional transition based on the current agent. The entry point is set to Data_gatherer and the chart is compiled into an executable workflow that guides the automation pipeline from start to finish.
def run_knowledge_graph_pipeline(topic):
print(f"🚀 Starting knowledge graph pipeline for: {topic}")
initial_state = {
"topic": topic,
"raw_text": "",
"entities": [],
"relations": [],
"resolved_relations": [],
"graph": None,
"validation": {},
"messages": [HumanMessage(content=f"Build a knowledge graph about {topic}")],
"current_agent": "data_gatherer"
}
kg_app = build_kg_graph()
final_state = kg_app.invoke(initial_state)
print(f"✨ Knowledge graph construction complete for: {topic}")
return final_state
The run_knowledge_graph_pipeline function initializes the pipeline by setting an empty state dictionary with a provided topic. It builds the workflow using build_kg_graph() and then runs it by calling the compiled graph with the initial state. As each agent processes data, state develops, the end result contains a complete knowledge graph and is verified and ready for use.
if __name__ == "__main__":
topic = "Artificial Intelligence"
result = run_knowledge_graph_pipeline(topic)
visualize_graph(result["graph"])
Finally, the block acts as the entry point for the script. When executed directly, it triggers the knowledge graph pipeline for the “AI” topic and runs in all proxy stages, and finally visualizes the resulting graph using the Visualize_graph() function. It provides an end-to-end demonstration of automated knowledge graph generation.
In short, we have learned how to seamlessly integrate multiple specializers into cohesive knowledge graphs with this structured approach, leveraging langgraph and Networkx. This workflow automates the entity and relationship extraction process and visualizes complex relationships to provide clear and feasible information representations. By tuning and enhancing a single agent, such as adopting more complex entity recognition methods or integrating real-time data sources, this underlying framework can be scaled and customized to suit advanced knowledge graph construction tasks in various fields.
Check COLAB notebook. All credits for this study are to the researchers on the project. Also, please feel free to follow us twitter And don’t forget to join us 90K+ ml reddit.

Asif Razzaq is CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, ASIF is committed to harnessing the potential of artificial intelligence to achieve social benefits. His recent effort is to launch Marktechpost, an artificial intelligence media platform that has an in-depth coverage of machine learning and deep learning news that can sound both technically, both through technical voices and be understood by a wide audience. The platform has over 2 million views per month, demonstrating its popularity among its audience.
