Entry Monitoring
Stream Solana block entries with transaction batches and execution results
Entry Monitoring with Yellowstone gRPC
This guide demonstrates how to use Yellowstone gRPC to monitor block entries on Solana. Block entries represent the fundamental units of execution on the Solana blockchain, containing transaction batches and their execution results. You'll learn how to track entries with reliable reconnection handling.
TypeScript Implementation
import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';
class GrpcStreamManager {
private client: Client;
private stream: any;
private isConnected: boolean = false;
private reconnectAttempts: number = 0;
private readonly maxReconnectAttempts: number = 10;
private readonly reconnectInterval: number = 5000; // 5 seconds
private readonly dataHandler: (data: any) => void;
constructor(
endpoint: string,
authToken: string,
dataHandler: (data: any) => void
) {
this.client = new Client(
endpoint,
authToken,
{ "grpc.max_receive_message_length": 64 * 1024 * 1024 }
);
this.dataHandler = dataHandler;
}
async connect(subscribeRequest: SubscribeRequest): Promise<void> {
try {
this.stream = await this.client.subscribe();
this.isConnected = true;
this.reconnectAttempts = 0;
this.stream.on("data", this.handleData.bind(this));
this.stream.on("error", this.handleError.bind(this));
this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
this.stream.on("close", () => this.handleDisconnect(subscribeRequest));
await this.write(subscribeRequest);
this.startPing();
} catch (error) {
console.error("Connection error:", error);
await this.reconnect(subscribeRequest);
}
}
private async write(req: SubscribeRequest): Promise<void> {
return new Promise((resolve, reject) => {
this.stream.write(req, (err: any) => err ? reject(err) : resolve());
});
}
private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached");
return;
}
this.reconnectAttempts++;
console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);
setTimeout(async () => {
try {
await this.connect(subscribeRequest);
} catch (error) {
console.error("Reconnection failed:", error);
await this.reconnect(subscribeRequest);
}
}, this.reconnectInterval * Math.min(this.reconnectAttempts, 5));
}
private startPing(): void {
setInterval(() => {
if (this.isConnected) {
this.write({
ping: { id: 1 },
accounts: {},
accountsDataSlice: [],
transactions: {},
blocks: {},
blocksMeta: {},
entry: {},
slots: {},
transactionsStatus: {},
}).catch(console.error);
}
}, 30000);
}
private handleData(data: any): void {
try {
const processed = this.processBuffers(data);
this.dataHandler(processed);
} catch (error) {
console.error("Error processing data:", error);
}
}
private handleError(error: any): void {
console.error("Stream error:", error);
this.isConnected = false;
}
private handleDisconnect(subscribeRequest: SubscribeRequest): void {
console.log("Stream disconnected");
this.isConnected = false;
this.reconnect(subscribeRequest);
}
private processBuffers(obj: any): any {
if (!obj) return obj;
if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
return bs58.default.encode(obj);
}
if (Array.isArray(obj)) {
return obj.map(item => this.processBuffers(item));
}
if (typeof obj === 'object') {
return Object.fromEntries(
Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
);
}
return obj;
}
}
// Entry monitoring implementation
async function monitorEntries() {
const manager = new GrpcStreamManager(
"your-grpc-url:2053",
"your-x-token",
handleEntryUpdate
);
const subscribeRequest: SubscribeRequest = {
entry: {
entrySubscribe: {} // Subscribe to all entries
},
accounts: {},
accountsDataSlice: [],
slots: {},
blocks: {},
blocksMeta: {},
transactions: {},
transactionsStatus: {},
commitment: CommitmentLevel.CONFIRMED,
};
console.log('Starting entry monitoring...');
await manager.connect(subscribeRequest);
}
function handleEntryUpdate(data: any): void {
if (data?.entry) {
const entry = data.entry;
console.log('\n=== Entry Details ===');
console.log(`Slot: ${entry.slot}`);
if (entry.index !== undefined) {
console.log(`Entry Index: ${entry.index}`);
}
if (entry.numHashes !== undefined) {
console.log(`Number of Hashes: ${entry.numHashes}`);
}
if (entry.hash) {
console.log(`Hash: ${entry.hash}`);
}
if (entry.transactions?.length > 0) {
console.log('\n=== Entry Transactions ===');
console.log(`Transaction Count: ${entry.transactions.length}`);
entry.transactions.forEach((tx: any, index: number) => {
console.log(`\nTransaction ${index + 1}:`);
if (tx.signature) {
console.log(` Signature: ${tx.signature}`);
}
if (tx.isVote !== undefined) {
console.log(` Is Vote: ${tx.isVote}`);
}
});
}
if (entry.tick !== undefined) {
console.log('\n=== Tick Information ===');
console.log(`Is Tick: ${entry.tick}`);
}
console.log('\n' + '='.repeat(50) + '\n');
}
}
// Start entry monitoring
monitorEntries().catch(console.error);
Entry Subscription Options
1. Monitor All Entries
const subscribeRequest: SubscribeRequest = {
entry: {
allEntries: {}
},
commitment: CommitmentLevel.CONFIRMED,
};
2. Monitor Filtered Entries
const subscribeRequest: SubscribeRequest = {
entry: {
filteredEntries: {
filter: {
voting: false, // Exclude vote transactions
failed: false // Exclude failed transactions
}
}
},
commitment: CommitmentLevel.CONFIRMED,
};
Data Structures
Entry Update:
Slot number (current slot being processed)
Entry index (position in the block)
Number of hashes (PoH count)
Entry hash (unique identifier)
Transaction count (number of transactions in entry)
Transactions array (detailed transaction information)
Transaction Details:
Signature (unique transaction identifier)
Vote status (whether it's a vote transaction)
Transaction message
Account keys (involved accounts)
Header information
Instructions (program calls and data)
Recent blockhash
Transaction metadata
Status (success/failure)
Fee amount
Balance changes
Log messages
Rust Implementation
use {
anyhow::Result, futures::{sink::SinkExt, stream::StreamExt}, hex, log::error, solana_sdk::pubkey::Pubkey, std::{collections::HashMap, sync::Arc, time::Duration}, tokio::sync::Mutex, tonic::{metadata::errors::InvalidMetadataValue, transport::Endpoint}, tonic_health::pb::health_client::HealthClient, yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken}, yellowstone_grpc_proto::{
geyser::{
geyser_client::GeyserClient, subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, SubscribeUpdateBlock, SubscribeUpdateEntry, SubscribeUpdateSlot, CommitmentLevel
},
prelude::SubscribeRequestPing,
}
};
/// Manager for handling gRPC stream connections and updates
struct GrpcStreamManager {
client: GeyserGrpcClient<InterceptorXToken>,
is_connected: bool,
reconnect_attempts: u32,
max_reconnect_attempts: u32,
reconnect_interval: Duration,
}
impl GrpcStreamManager {
/// Creates a new GrpcStreamManager instance
async fn new(endpoint: &str, x_token: &str) -> Result<Arc<Mutex<Self>>> {
let interceptor = InterceptorXToken {
x_token: Some(x_token.parse().map_err(|e: InvalidMetadataValue| anyhow::Error::from(e))?),
x_request_snapshot: true,
};
let channel = Endpoint::from_shared(endpoint.to_string())?
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
.connect()
.await
.map_err(|e| anyhow::Error::from(e))?;
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor),
);
Ok(Arc::new(Mutex::new(Self {
client,
is_connected: false,
reconnect_attempts: 0,
max_reconnect_attempts: 10,
reconnect_interval: Duration::from_secs(5),
})))
}
/// Handles entry update messages from the gRPC stream
fn handle_entry_update(&self, entry: &SubscribeUpdateEntry) {
println!("\n=== Entry Details ===");
println!("Slot: {}", entry.slot);
println!("Entry Index: {}", entry.index);
println!("Number of Hashes: {}", entry.num_hashes);
if !entry.hash.is_empty() {
println!("Hash: {}", hex::encode(&entry.hash));
}
println!("Executed Transaction Count: {}", entry.executed_transaction_count);
if entry.starting_transaction_index > 0 {
println!("Starting Transaction Index: {}", entry.starting_transaction_index);
}
println!("\n{}\n", "=".repeat(50));
}
/// Establishes connection and handles the subscription stream
async fn connect(&mut self, request: SubscribeRequest) -> Result<()> {
let request = request.clone();
let (mut subscribe_tx, mut stream) = self.client.subscribe_with_request(Some(request.clone())).await?;
self.is_connected = true;
self.reconnect_attempts = 0;
while let Some(message) = stream.next().await {
match message {
Ok(msg) => {
match msg.update_oneof {
Some(UpdateOneof::Entry(entry)) => {
self.handle_entry_update(&entry);
}
Some(UpdateOneof::Ping(_)) => {
subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await?;
}
Some(UpdateOneof::Pong(_)) => {} // Ignore pong responses
_ => {
println!("Other update received: {:?}", msg);
}
}
}
Err(err) => {
error!("Error: {:?}", err);
self.is_connected = false;
Box::pin(self.reconnect(request.clone())).await?;
break;
}
}
}
Ok(())
}
/// Attempts to reconnect when the connection is lost
async fn reconnect(&mut self, request: SubscribeRequest) -> Result<()> {
if self.reconnect_attempts >= self.max_reconnect_attempts {
println!("Max reconnection attempts reached");
return Ok(());
}
self.reconnect_attempts += 1;
println!("Reconnecting... Attempt {}", self.reconnect_attempts);
let backoff = self.reconnect_interval * std::cmp::min(self.reconnect_attempts, 5);
tokio::time::sleep(backoff).await;
Box::pin(self.connect(request)).await
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize gRPC stream manager
let manager = GrpcStreamManager::new(
"your-grpc-url:2053",
"your-x-token",
).await?;
let mut manager_lock = manager.lock().await;
// Create subscription request for entries only
let request = SubscribeRequest {
entry: HashMap::from_iter(vec![(
"entry".to_string(),
SubscribeRequestFilterEntry {},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
println!("Starting entry monitoring...");
// Start the subscription
let result = manager_lock.connect(request).await;
if let Err(e) = &result {
println!("Subscription error: {:?}", e);
}
result?;
Ok(())
}
Go Implementation
package main
import (
"context"
"crypto/tls"
"encoding/hex"
"fmt"
"log"
"sync"
"time"
pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
func handleEntryUpdate(entry *pb.SubscribeUpdateEntry) {
fmt.Printf("Entry Update:\n")
fmt.Printf(" Slot: %d\n", entry.Slot)
fmt.Printf(" Index: %d\n", entry.Index)
fmt.Printf(" Num Hashes: %d\n", entry.NumHashes)
fmt.Printf(" Hash: %s\n", hex.EncodeToString(entry.Hash))
fmt.Printf(" Transaction Count: %d\n", entry.ExecutedTransactionCount)
if entry.Transactions != nil {
fmt.Printf("Processing %d transactions...\n", len(entry.Transactions))
for i, tx := range entry.Transactions {
fmt.Printf("Transaction %d:\n", i+1)
fmt.Printf(" Signature: %s\n", hex.EncodeToString(tx.Signature))
fmt.Printf(" Is Vote: %v\n", tx.IsVote)
if tx.Meta != nil {
fmt.Printf(" Status: %s\n", getTransactionStatus(tx.Meta.Err))
fmt.Printf(" Fee: %d\n", tx.Meta.Fee)
fmt.Printf(" Pre Balances: %v\n", tx.Meta.PreBalances)
fmt.Printf(" Post Balances: %v\n", tx.Meta.PostBalances)
}
}
}
}
func getTransactionStatus(err interface{}) string {
if err == nil {
return "Success"
}
return "Failed"
}
func main() {
ctx := context.Background()
// Create manager with data handler and x-token
manager, err := NewGrpcStreamManager(
"your-dedicated-node-url:2053",
"x-token",
handleEntryUpdate,
)
if err != nil {
log.Fatal(err)
}
defer manager.Close()
// Create subscription request for entry monitoring
entryFilter := &pb.SubscribeRequestFilterEntry{
Filter: &pb.EntryFilter{
Voting: false,
Failed: false,
},
}
req := &pb.SubscribeRequest{
Entry: entryFilter,
Commitment: pb.CommitmentLevel_CONFIRMED,
}
// Connect and handle updates
if err := manager.Connect(ctx, req); err != nil {
log.Fatal(err)
}
// Keep the main goroutine running
select {}
}
Last updated
Was this helpful?