0

Use rag agents, multi-course chat performance monitoring, and establish an Ollama Langchain workflow that accelerates GPU acceleration

In this tutorial, we build a gpu local LLM stack that unifies Ollama and Langchain. We install the required libraries, start the Ollama server, pull the model and wrap it in a custom Langchain LLM, allowing us to control the temperature, token limits and context. We have added a search-type generation layer that takes PDF or text, chunks them, embeds them into a sentence converter and provides the basic answer. We manage multi-shock chat memory, register tools (Web Search + RAG queries), and rotate a proxy about when they are called.

import os
import sys
import subprocess
import time
import threading
import queue
import json
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from contextlib import contextmanager
import asyncio
from concurrent.futures import ThreadPoolExecutor


def install_packages():
    """Install required packages for Colab environment"""
    packages = [
        "langchain",
        "langchain-community",
        "langchain-core",
        "chromadb",
        "sentence-transformers",
        "faiss-cpu",
        "pypdf",
        "python-docx",
        "requests",
        "psutil",
        "pyngrok",
        "gradio"
    ]
   
    for package in packages:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])


install_packages()


import requests
import psutil
import threading
from queue import Queue
from langchain.llms.base import LLM
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.schema import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain.memory import ConversationBufferWindowMemory, ConversationSummaryBufferMemory
from langchain.chains import ConversationChain, RetrievalQA
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS, Chroma
from langchain.agents import AgentType, initialize_agent, Tool
from langchain.tools import DuckDuckGoSearchRun

We import the necessary Python utilities in COLAB for concurrency, system calls and JSON processing. We define and run install_packages() to zip chains, embedded, vector storage, document loader, monitoring and UI dependencies. We then import Langchain LLM, memory, retrieval and proxy tools (including DuckDuckgo search) to build a scalable rag and proxy workflow.

[Download the full codes with notebook here]

@dataclass
class OllamaConfig:
    """Configuration for Ollama setup"""
    model_name: str = "llama2"
    base_url: str = "
    max_tokens: int = 2048
    temperature: float = 0.7
    gpu_layers: int = -1  
    context_window: int = 4096
    batch_size: int = 512
    threads: int = 4

We define an Ollamaconfig data level, so we put all Olama runtime settings in a clean place. We set the model name and local API endpoints, as well as the generation behavior (max_tokens, temperature and context_window). We use gpu_layers (−1 = will all be loaded to the GPU when possible), batch_size, and threads for parallelism.

@dataclass
class OllamaConfig:
    """Configuration for Ollama setup"""
    model_name: str = "llama2"
    base_url: str = "
    max_tokens: int = 2048
    temperature: float = 0.7
    gpu_layers: int = -1  
    context_window: int = 4096
    batch_size: int = 512
    threads: int = 4
We define an OllamaConfig dataclass so we keep all Ollama runtime settings in one clean place. We set the model name and local API endpoint, as well as the generation behavior (max_tokens, temperature, and context_window). We control performance with gpu_layers (‑1 = load all to GPU when possible), batch_size, and threads for parallelism.

class OllamaManager:
    """Advanced Ollama manager for Colab environment"""
   
    def __init__(self, config: OllamaConfig):
        self.config = config
        self.process = None
        self.is_running = False
        self.models_cache = {}
        self.performance_monitor = PerformanceMonitor()
       
    def install_ollama(self):
        """Install Ollama in Colab environment"""
        try:
            subprocess.run([
                "curl", "-fsSL", " "-o", "/tmp/install.sh"
            ], check=True)
           
            subprocess.run(["bash", "/tmp/install.sh"], check=True)
            print("✅ Ollama installed successfully")
           
        except subprocess.CalledProcessError as e:
            print(f"❌ Failed to install Ollama: {e}")
            raise
   
    def start_server(self):
        """Start Ollama server with GPU support"""
        if self.is_running:
            print("Ollama server is already running")
            return
           
        try:
            env = os.environ.copy()
            env["OLLAMA_NUM_PARALLEL"] = str(self.config.threads)
            env["OLLAMA_MAX_LOADED_MODELS"] = "3"
           
            self.process = subprocess.Popen(
                ["ollama", "serve"],
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
           
            time.sleep(5)
           
            if self.health_check():
                self.is_running = True
                print("✅ Ollama server started successfully")
                self.performance_monitor.start()
            else:
                raise Exception("Server failed to start properly")
               
        except Exception as e:
            print(f"❌ Failed to start Ollama server: {e}")
            raise
   
    def health_check(self) -> bool:
        """Check if Ollama server is healthy"""
        try:
            response = requests.get(f"{self.config.base_url}/api/tags", timeout=10)
            return response.status_code == 200
        except:
            return False
   
    def pull_model(self, model_name: str) -> bool:
        """Pull a model from Ollama registry"""
        try:
            print(f"🔄 Pulling model: {model_name}")
            result = subprocess.run(
                ["ollama", "pull", model_name],
                capture_output=True,
                text=True,
                timeout=1800  
            )
           
            if result.returncode == 0:
                print(f"✅ Model {model_name} pulled successfully")
                self.models_cache[model_name] = True
                return True
            else:
                print(f"❌ Failed to pull model {model_name}: {result.stderr}")
                return False
               
        except subprocess.TimeoutExpired:
            print(f"❌ Timeout pulling model {model_name}")
            return False
        except Exception as e:
            print(f"❌ Error pulling model {model_name}: {e}")
            return False
   
    def list_models(self) -> List[str]:
        """List available local models"""
        try:
            result = subprocess.run(
                ["ollama", "list"],
                capture_output=True,
                text=True
            )
           
            models = []
            for line in result.stdout.split('n')[1:]:
                if line.strip():
                    model_name = line.split()[0]
                    models.append(model_name)
                   
            return models
           
        except Exception as e:
            print(f"❌ Error listing models: {e}")
            return []
   
    def stop_server(self):
        """Stop Ollama server"""
        if self.process:
            self.process.terminate()
            self.process.wait()
            self.is_running = False
            self.performance_monitor.stop()
            print("✅ Ollama server stopped")

We created the Ollamamanager class to install, start, monitor and manage the Ollama server in the COLAB environment. We set up environment variables for GPU parallelism, run the server in the background, and verify it with a health check. We pull models in demand, cache them, list available models locally, and gracefully shut down the server when the task is completed while tracking performance.

[Download the full codes with notebook here]

class PerformanceMonitor:
    """Monitor system performance and resource usage"""
   
    def __init__(self):
        self.monitoring = False
        self.stats = {
            "cpu_usage": [],
            "memory_usage": [],
            "gpu_usage": [],
            "inference_times": []
        }
        self.monitor_thread = None
   
    def start(self):
        """Start performance monitoring"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
   
    def stop(self):
        """Stop performance monitoring"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
   
    def _monitor_loop(self):
        """Main monitoring loop"""
        while self.monitoring:
            try:
                cpu_percent = psutil.cpu_percent(interval=1)
                memory = psutil.virtual_memory()
               
                self.stats["cpu_usage"].append(cpu_percent)
                self.stats["memory_usage"].append(memory.percent)
               
                for key in ["cpu_usage", "memory_usage"]:
                    if len(self.stats[key]) > 100:
                        self.stats[key] = self.stats[key][-100:]
               
                time.sleep(5)
               
            except Exception as e:
                print(f"Monitoring error: {e}")
   
    def get_stats(self) -> Dict[str, Any]:
        """Get current performance statistics"""
        return {
            "avg_cpu": sum(self.stats["cpu_usage"][-10:]) / max(len(self.stats["cpu_usage"][-10:]), 1),
            "avg_memory": sum(self.stats["memory_usage"][-10:]) / max(len(self.stats["memory_usage"][-10:]), 1),
            "total_inferences": len(self.stats["inference_times"]),
            "avg_inference_time": sum(self.stats["inference_times"]) / max(len(self.stats["inference_times"]), 1)
        }

We define a PerformanceMonitor class for real-time tracking of CPU, memory, and inference time while the Ollama server is running. We initiate a background thread every few seconds to collect statistics, store recent metrics, and provide an average usage summary. This helps us monitor system load and optimize performance during model inference.

[Download the full codes with notebook here]

class OllamaLLM(LLM):
    """Custom LangChain LLM for Ollama"""
   
    model_name: str = "llama2"
    base_url: str = "
    temperature: float = 0.7
    max_tokens: int = 2048
    performance_monitor: Optional[PerformanceMonitor] = None
   
    @property
    def _llm_type(self) -> str:
        return "ollama"
   
    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Make API call to Ollama"""
        start_time = time.time()
       
        try:
            payload = {
                "model": self.model_name,
                "prompt": prompt,
                "stream": False,
                "options": {
                    "temperature": self.temperature,
                    "num_predict": self.max_tokens,
                    "stop": stop or []
                }
            }
           
            response = requests.post(
                f"{self.base_url}/api/generate",
                json=payload,
                timeout=120
            )
           
            response.raise_for_status()
            result = response.json()
           
            inference_time = time.time() - start_time
           
            if self.performance_monitor:
                self.performance_monitor.stats["inference_times"].append(inference_time)
           
            return result.get("response", "")
           
        except Exception as e:
            print(f"❌ Ollama API error: {e}")
            return f"Error: {str(e)}"

We wrap the Ollama API in a custom Ollamallm class that is compatible with Langchain’s LLM interface. We define how to send prompts to the Ollama server and record each inference time for performance tracking. This allows us to plug Ollama directly into Langchain chains, proxy and memory components while monitoring efficiency.

class RAGSystem:
    """Retrieval-Augmented Generation system"""
   
    def __init__(self, llm: OllamaLLM, embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.llm = llm
        self.embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
        self.vector_store = None
        self.qa_chain = None
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len
        )
   
    def add_documents(self, file_paths: List[str]):
        """Add documents to the vector store"""
        documents = []
       
        for file_path in file_paths:
            try:
                if file_path.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                else:
                    loader = TextLoader(file_path)
               
                docs = loader.load()
                documents.extend(docs)
               
            except Exception as e:
                print(f"❌ Error loading {file_path}: {e}")
       
        if documents:
            splits = self.text_splitter.split_documents(documents)
           
            if self.vector_store is None:
                self.vector_store = FAISS.from_documents(splits, self.embeddings)
            else:
                self.vector_store.add_documents(splits)
           
            self.qa_chain = RetrievalQA.from_chain_type(
                llm=self.llm,
                chain_type="stuff",
                retriever=self.vector_store.as_retriever(search_kwargs={"k": 3}),
                return_source_documents=True
            )
           
            print(f"✅ Added {len(splits)} document chunks to vector store")
   
    def query(self, question: str) -> Dict[str, Any]:
        """Query the RAG system"""
        if not self.qa_chain:
            return {"answer": "No documents loaded. Please add documents first."}
       
        try:
            result = self.qa_chain({"query": question})
            return {
                "answer": result["result"],
                "sources": [doc.metadata for doc in result.get("source_documents", [])]
            }
        except Exception as e:
            return {"answer": f"Error: {str(e)}"}

We use a conversation manager to manage multiple text memories, providing buffer-based and digest-based chat history for each session. Then, in Ollamalangchainsystem, we aggregate all the components (server, LLM, rag, memory, tools and agents) into a unified interface. We configured the system to install Ollama, pull models, build agents using tools like web search and rags, and reveal chat, document upload and model switching features for seamless interaction.

class ConversationManager:
    """Manage conversation history and memory"""
   
    def __init__(self, llm: OllamaLLM, memory_type: str = "buffer"):
        self.llm = llm
        self.conversations = {}
        self.memory_type = memory_type
       
    def get_conversation(self, session_id: str) -> ConversationChain:
        """Get or create conversation for session"""
        if session_id not in self.conversations:
            if self.memory_type == "buffer":
                memory = ConversationBufferWindowMemory(k=10)
            elif self.memory_type == "summary":
                memory = ConversationSummaryBufferMemory(
                    llm=self.llm,
                    max_token_limit=1000
                )
            else:
                memory = ConversationBufferWindowMemory(k=10)
           
            self.conversations[session_id] = ConversationChain(
                llm=self.llm,
                memory=memory,
                verbose=True
            )
       
        return self.conversations[session_id]
   
    def chat(self, session_id: str, message: str) -> str:
        """Chat with specific session"""
        conversation = self.get_conversation(session_id)
        return conversation.predict(input=message)
   
    def clear_session(self, session_id: str):
        """Clear conversation history for session"""
        if session_id in self.conversations:
            del self.conversations[session_id]


class OllamaLangChainSystem:
    """Main system integrating all components"""
   
    def __init__(self, config: OllamaConfig):
        self.config = config
        self.manager = OllamaManager(config)
        self.llm = None
        self.rag_system = None
        self.conversation_manager = None
        self.tools = []
        self.agent = None
       
    def setup(self):
        """Complete system setup"""
        print("🚀 Setting up Ollama + LangChain system...")
       
        self.manager.install_ollama()
        self.manager.start_server()
       
        if not self.manager.pull_model(self.config.model_name):
            print("❌ Failed to pull default model")
            return False
       
        self.llm = OllamaLLM(
            model_name=self.config.model_name,
            base_url=self.config.base_url,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens,
            performance_monitor=self.manager.performance_monitor
        )
       
        self.rag_system = RAGSystem(self.llm)
       
        self.conversation_manager = ConversationManager(self.llm)
       
        self._setup_tools()
       
        print("✅ System setup complete!")
        return True
   
    def _setup_tools(self):
        """Setup tools for the agent"""
        search = DuckDuckGoSearchRun()
       
        self.tools = [
            Tool(
                name="Search",
                func=search.run,
                description="Search the internet for current information"
            ),
            Tool(
                name="RAG_Query",
                func=lambda q: self.rag_system.query(q)["answer"],
                description="Query loaded documents using RAG"
            )
        ]
       
        self.agent = initialize_agent(
            tools=self.tools,
            llm=self.llm,
            agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            verbose=True
        )
   
    def chat(self, message: str, session_id: str = "default") -> str:
        """Simple chat interface"""
        return self.conversation_manager.chat(session_id, message)
   
    def rag_chat(self, question: str) -> Dict[str, Any]:
        """RAG-based chat"""
        return self.rag_system.query(question)
   
    def agent_chat(self, message: str) -> str:
        """Agent-based chat with tools"""
        return self.agent.run(message)
   
    def switch_model(self, model_name: str) -> bool:
        """Switch to different model"""
        if self.manager.pull_model(model_name):
            self.llm.model_name = model_name
            print(f"✅ Switched to model: {model_name}")
            return True
        return False
   
    def load_documents(self, file_paths: List[str]):
        """Load documents into RAG system"""
        self.rag_system.add_documents(file_paths)
   
    def get_performance_stats(self) -> Dict[str, Any]:
        """Get system performance statistics"""
        return self.manager.performance_monitor.get_stats()
   
    def cleanup(self):
        """Clean up resources"""
        self.manager.stop_server()
        print("✅ System cleanup complete")

We use the conversation manager to maintain separate chat sessions, each with a buffer-based or digest-based memory type, allowing us to preserve or summarize the context as needed. In Ollamalangchainsystem, we integrate everything: we install and start Ollama, pull the required model, wrap it in a langchain-compatible LLM, connect the rag system, initialize the chat memory, and register a web search (e.g., web search).

def main():
    """Main function demonstrating the system"""
   
    config = OllamaConfig(
        model_name="llama2",
        temperature=0.7,
        max_tokens=2048
    )
   
    system = OllamaLangChainSystem(config)
   
    try:
        if not system.setup():
            return
       
        print("n🗣️ Testing basic chat:")
        response = system.chat("Hello! How are you?")
        print(f"Response: {response}")
       
        print("n🔄 Testing model switching:")
        models = system.manager.list_models()
        print(f"Available models: {models}")
       
       
        print("n🤖 Testing agent:")
        agent_response = system.agent_chat("What's the current weather like?")
        print(f"Agent Response: {agent_response}")
       
        print("n📊 Performance Statistics:")
        stats = system.get_performance_stats()
        print(json.dumps(stats, indent=2))
       
    except KeyboardInterrupt:
        print("n⏹️ Interrupted by user")
    except Exception as e:
        print(f"❌ Error: {e}")
    finally:
        system.cleanup()


def create_gradio_interface(system: OllamaLangChainSystem):
    """Create a Gradio interface for easy interaction"""
    try:
        import gradio as gr
       
        def chat_interface(message, history, mode):
            if mode == "Basic Chat":
                response = system.chat(message)
            elif mode == "RAG Chat":
                result = system.rag_chat(message)
                response = result["answer"]
            elif mode == "Agent Chat":
                response = system.agent_chat(message)
            else:
                response = "Unknown mode"
           
            history.append((message, response))
            return "", history
       
        def upload_docs(files):
            if files:
                file_paths = [f.name for f in files]
                system.load_documents(file_paths)
                return f"Loaded {len(file_paths)} documents into RAG system"
            return "No files uploaded"
       
        def get_stats():
            stats = system.get_performance_stats()
            return json.dumps(stats, indent=2)
       
        with gr.Blocks(title="Ollama + LangChain System") as demo:
            gr.Markdown("# 🦙 Ollama + LangChain Advanced System")
           
            with gr.Tab("Chat"):
                chatbot = gr.Chatbot()
                mode = gr.Dropdown(
                    ["Basic Chat", "RAG Chat", "Agent Chat"],
                    value="Basic Chat",
                    label="Chat Mode"
                )
                msg = gr.Textbox(label="Message")
                clear = gr.Button("Clear")
               
                msg.submit(chat_interface, [msg, chatbot, mode], [msg, chatbot])
                clear.click(lambda: ([], ""), outputs=[chatbot, msg])
           
            with gr.Tab("Document Upload"):
                file_upload = gr.File(file_count="multiple", label="Upload Documents")
                upload_btn = gr.Button("Upload to RAG System")
                upload_status = gr.Textbox(label="Status")
               
                upload_btn.click(upload_docs, file_upload, upload_status)
           
            with gr.Tab("Performance"):
                stats_btn = gr.Button("Get Performance Stats")
                stats_output = gr.Textbox(label="Performance Statistics")
               
                stats_btn.click(get_stats, outputs=stats_output)
       
        return demo
       
    except ImportError:
        print("Gradio not installed. Skipping interface creation.")
        return None


if __name__ == "__main__":
    print("🚀 Ollama + LangChain System for Google Colab")
    print("=" * 50)
   
    main()
   
    # Or create a system instance for interactive use
    # config = OllamaConfig(model_name="llama2")
    # system = OllamaLangChainSystem(config)
    # system.setup()
   
    # # Create Gradio interface
    # demo = create_gradio_interface(system)
    # if demo:
    #     demo.launch(share=True)  # share=True for public link

We wrap everything in main features to run a full demo, setup system, test chat, proxy tools, model lists and performance statistics. Then, in create_gradio_interface() we build a user-friendly Gradio application with tabs for chatting, uploading documents to the rag system, and monitoring performance. Finally, we call Main() in the __ -Main__ block for direct COLAB execution, or choose to launch the Gradio UI for interactive exploration and public sharing.

In short, we have a flexible playground: we switch Ollama models, talk to buffered or digest memory, question our own documentation, reach out to search when context is lost, and monitor basic resource statistics to stay within Colab limits. The code is modular, allowing us to extend the tool list in OllamaconFig, adjust the inference options (temperature, maximum token, concurrency), or adjust the RAG pipeline to a larger corpus or a different embedding model. We launched a Gradio application with Share = true to collaborate or embed these components into our project. Now we have an extensible template for fast local LLM experiments.


Check Code. All credits for this study are to the researchers on the project. Subscribe now To our AI newsletter


Asif Razzaq is CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, ASIF is committed to harnessing the potential of artificial intelligence to achieve social benefits. His recent effort is to launch Marktechpost, an artificial intelligence media platform that has an in-depth coverage of machine learning and deep learning news that can sound both technically, both through technical voices and be understood by a wide audience. The platform has over 2 million views per month, demonstrating its popularity among its audience.