Migrate end-to-end test to latest cloud SDKs
Signed-off-by: Stefan Prodan <stefan.prodan@gmail.com>
This commit is contained in:
@@ -19,9 +19,10 @@ package integration
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
eventhub "github.com/Azure/azure-event-hubs-go/v3"
|
||||
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
|
||||
"github.com/fluxcd/pkg/git"
|
||||
"github.com/fluxcd/test-infra/tftestenv"
|
||||
tfjson "github.com/hashicorp/terraform-json"
|
||||
@@ -148,27 +149,47 @@ func registryLoginACR(ctx context.Context, output map[string]*tfjson.StateOutput
|
||||
}
|
||||
|
||||
func setupEventHubHandler(ctx context.Context, c chan []byte, eventHubSas string) (func(), error) {
|
||||
hub, err := eventhub.NewHubFromConnectionString(eventHubSas)
|
||||
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(eventHubSas, "", azeventhubs.DefaultConsumerGroup, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
handler := func(ctx context.Context, event *eventhub.Event) error {
|
||||
c <- event.Data
|
||||
return nil
|
||||
}
|
||||
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
|
||||
props, err := consumerClient.GetEventHubProperties(ctx, nil)
|
||||
if err != nil {
|
||||
consumerClient.Close(ctx)
|
||||
return nil, err
|
||||
}
|
||||
listenerHandler, err := hub.Receive(ctx, runtimeInfo.PartitionIDs[0], handler, eventhub.ReceiveWithLatestOffset())
|
||||
|
||||
latest := true
|
||||
partitionClient, err := consumerClient.NewPartitionClient(props.PartitionIDs[0], &azeventhubs.PartitionClientOptions{
|
||||
StartPosition: azeventhubs.StartPosition{Latest: &latest},
|
||||
})
|
||||
if err != nil {
|
||||
consumerClient.Close(ctx)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
receiveCtx, cancel := context.WithCancel(ctx)
|
||||
go func() {
|
||||
for {
|
||||
events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)
|
||||
if err != nil {
|
||||
if receiveCtx.Err() != nil {
|
||||
return
|
||||
}
|
||||
log.Printf("error receiving event hub events: %s\n", err)
|
||||
return
|
||||
}
|
||||
for _, event := range events {
|
||||
c <- event.Body
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
closefn := func() {
|
||||
listenerHandler.Close(ctx)
|
||||
hub.Close(ctx)
|
||||
cancel()
|
||||
partitionClient.Close(ctx)
|
||||
consumerClient.Close(ctx)
|
||||
}
|
||||
|
||||
return closefn, nil
|
||||
|
||||
Reference in New Issue
Block a user