Examples
Practical examples demonstrating SDK usage patterns.
Basic File Operations
Reading and Writing Files
from mcp_tool_kit import MCPToolKitSDK
with MCPToolKitSDK() as sdk:
# Read a file
result = sdk.call_tool("read_file", {"path": "input.txt"})
if result.success:
content = result.data
# Process content
processed = content.upper()
# Write to new file
result = sdk.call_tool("write_file", {
"path": "output.txt",
"content": processed
})
Using FileOperations Helper
with MCPToolKitSDK() as sdk:
file = sdk.file("data.json")
# Read JSON data
json_str = file.read()
if json_str:
import json
data = json.loads(json_str)
# Modify data
data['updated'] = True
# Write back
file.write(json.dumps(data, indent=2))
Batch Processing
Processing Multiple Files
def process_log_files(sdk, log_dir):
"""Process all log files in a directory."""
# Get list of log files
result = sdk.call_tool("list_directory", {"path": log_dir})
if not result.success:
return []
log_files = [f for f in result.data if f.endswith('.log')]
# Create batch operations
operations = [
{"tool": "read_file", "params": {"path": f"{log_dir}/{file}"}}
for file in log_files
]
# Execute batch
results = sdk.batch_call(operations)
# Process results
summaries = []
for file, result in zip(log_files, results):
if result.success:
lines = result.data.split('\n')
errors = [l for l in lines if 'ERROR' in l]
summaries.append({
'file': file,
'total_lines': len(lines),
'errors': len(errors)
})
return summaries
Parallel Processing with Async
import asyncio
async def process_urls_async(urls):
"""Fetch multiple URLs concurrently."""
async with MCPToolKitSDK(async_mode=True) as sdk:
operations = [
{"tool": "fetch", "params": {"url": url, "method": "GET"}}
for url in urls
]
results = await sdk.batch_call_async(operations)
successful = []
failed = []
for url, result in zip(urls, results):
if result.success:
successful.append((url, len(str(result.data))))
else:
failed.append((url, result.error))
return successful, failed
# Usage
urls = [
"https://api.github.com/users/github",
"https://api.github.com/users/torvalds",
"https://api.github.com/users/gvanrossum"
]
successful, failed = asyncio.run(process_urls_async(urls))
Error Handling and Retry
Custom Retry Logic
def reliable_file_operation(sdk, path, max_retries=5):
"""Read file with custom retry logic."""
for attempt in range(max_retries):
result = sdk.call_tool("read_file", {"path": path}, retry=1)
if result.success:
return result.data
if "not found" in result.error.lower():
# File doesn't exist, no point retrying
raise FileNotFoundError(f"File {path} not found")
if attempt < max_retries - 1:
# Wait before retry with exponential backoff
import time
time.sleep(2 ** attempt)
print(f"Retry {attempt + 1}/{max_retries} for {path}")
raise Exception(f"Failed to read {path} after {max_retries} attempts")
Error Recovery
def safe_file_update(sdk, path, update_func):
"""Safely update a file with backup."""
backup_path = f"{path}.backup"
try:
# Read original
result = sdk.call_tool("read_file", {"path": path})
if not result.success:
raise Exception(f"Cannot read {path}: {result.error}")
original_content = result.data
# Create backup
result = sdk.call_tool("write_file", {
"path": backup_path,
"content": original_content
})
if not result.success:
raise Exception("Failed to create backup")
# Apply update
new_content = update_func(original_content)
# Write updated content
result = sdk.call_tool("write_file", {
"path": path,
"content": new_content
})
if result.success:
# Remove backup on success
sdk.call_tool("delete_file", {"path": backup_path})
else:
# Restore from backup on failure
sdk.call_tool("write_file", {
"path": path,
"content": original_content
})
raise Exception("Update failed, restored from backup")
except Exception as e:
print(f"Error: {e}")
raise
Middleware and Events
Authentication Middleware
class AuthenticatedSDK:
def __init__(self, api_key):
self.sdk = MCPToolKitSDK()
self.api_key = api_key
# Add authentication to all requests
self.sdk.add_middleware(self._auth_middleware)
# Log all errors
self.sdk.on('error', self._log_error)
def _auth_middleware(self, tool_name, params):
# Add API key to web requests
if tool_name in ['fetch', 'web_request']:
if 'headers' not in params:
params['headers'] = {}
params['headers']['Authorization'] = f"Bearer {self.api_key}"
return params
def _log_error(self, tool_name, params, error):
import logging
logging.error(f"Tool {tool_name} failed: {error}")
Request Logging
def setup_request_logging(sdk):
"""Add comprehensive logging to SDK."""
import time
import logging
logger = logging.getLogger("mcp_sdk")
# Track request timing
request_times = {}
def before_request(tool_name, params):
request_id = f"{tool_name}_{time.time()}"
request_times[request_id] = time.time()
logger.info(f"Starting {tool_name} with params: {params}")
return request_id
def after_request(tool_name, params, result):
# Find matching request
for req_id, start_time in request_times.items():
if req_id.startswith(tool_name):
duration = time.time() - start_time
logger.info(f"{tool_name} completed in {duration:.2f}s")
del request_times[req_id]
break
sdk.on('before_call', before_request)
sdk.on('after_call', after_request)
sdk.on('error', lambda t, p, e: logger.error(f"{t} error: {e}"))
Git Integration
Automated Git Workflow
def auto_commit_changes(sdk, repo_path, message_prefix="Auto-update"):
"""Automatically commit all changes in a repository."""
git = sdk.git(repo_path)
# Check status
status = git.status()
if "nothing to commit" in status:
print("No changes to commit")
return False
# Get list of changed files
result = sdk.call_tool("git_diff", {"repo_path": repo_path})
if result.success:
# Parse changed files from diff
changed_files = parse_changed_files(result.data)
# Generate commit message
message = f"{message_prefix}: Updated {len(changed_files)} files"
# Commit changes
if git.commit(message, files=changed_files):
print(f"Committed: {message}")
return True
return False
Data Processing Pipeline
CSV Processing Example
def process_csv_files(sdk, input_dir, output_dir):
"""Process all CSV files in a directory."""
import csv
import io
# Ensure output directory exists
sdk.call_tool("create_directory", {"path": output_dir})
# Get all CSV files
result = sdk.call_tool("list_directory", {"path": input_dir})
if not result.success:
return
csv_files = [f for f in result.data if f.endswith('.csv')]
for csv_file in csv_files:
# Read CSV
file_path = f"{input_dir}/{csv_file}"
result = sdk.call_tool("read_file", {"path": file_path})
if result.success:
# Parse CSV
reader = csv.DictReader(io.StringIO(result.data))
rows = list(reader)
# Process data (example: convert to uppercase)
for row in rows:
for key in row:
if isinstance(row[key], str):
row[key] = row[key].upper()
# Write processed CSV
output = io.StringIO()
if rows:
writer = csv.DictWriter(output, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
# Save to output directory
output_path = f"{output_dir}/{csv_file}"
sdk.call_tool("write_file", {
"path": output_path,
"content": output.getvalue()
})
print(f"Processed {csv_file} -> {output_path}")
Custom Integration
Building a File Sync Service
class FileSyncService:
"""Sync files between local and remote locations."""
def __init__(self, local_dir, remote_url):
self.sdk = MCPToolKitSDK()
self.local_dir = local_dir
self.remote_url = remote_url
# Track sync status
self.synced_files = {}
def sync_file(self, filename):
"""Sync a single file."""
local_path = f"{self.local_dir}/{filename}"
# Read local file
file = self.sdk.file(local_path)
content = file.read()
if not content:
return False
# Calculate checksum
import hashlib
checksum = hashlib.md5(content.encode()).hexdigest()
# Check if file needs sync
if self.synced_files.get(filename) == checksum:
return True # Already synced
# Upload to remote
web = self.sdk.web()
response = web.post(
f"{self.remote_url}/upload",
{
"filename": filename,
"content": content,
"checksum": checksum
}
)
if response:
self.synced_files[filename] = checksum
return True
return False
def sync_directory(self):
"""Sync all files in directory."""
result = self.sdk.call_tool("list_directory", {"path": self.local_dir})
if result.success:
files = [f for f in result.data if not f.startswith('.')]
success_count = 0
for file in files:
if self.sync_file(file):
success_count += 1
print(f"Synced: {file}")
else:
print(f"Failed: {file}")
print(f"Synced {success_count}/{len(files)} files")
Testing with the SDK
Unit Test Helper
import unittest
from unittest.mock import Mock
class TestWithSDK(unittest.TestCase):
"""Base test class with SDK mocking."""
def setUp(self):
self.sdk = MCPToolKitSDK()
self.sdk.client = Mock()
def mock_tool_response(self, tool_name, response_data):
"""Mock a successful tool response."""
self.sdk.client.call_tool.return_value = json.dumps(response_data)
def mock_tool_error(self, tool_name, error_message):
"""Mock a tool error."""
self.sdk.client.call_tool.return_value = json.dumps({
"error": error_message
})
class TestFileOperations(TestWithSDK):
def test_read_file(self):
# Mock response
self.mock_tool_response("read_file", "file contents")
# Test
file = self.sdk.file("test.txt")
content = file.read()
# Assert
self.assertEqual(content, "file contents")
self.sdk.client.call_tool.assert_called_with(
"read_file",
{"path": "test.txt"}
)